下面列出了怎么用org.springframework.util.concurrent.ListenableFuture的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void performGet() throws Exception {
String responseBody = "{\"name\" : \"Ludwig van Beethoven\", \"someDouble\" : \"1.6035\"}";
this.mockServer.expect(requestTo("/composers/42")).andExpect(method(HttpMethod.GET))
.andRespond(withSuccess(responseBody, MediaType.APPLICATION_JSON));
@SuppressWarnings("unused")
ListenableFuture<ResponseEntity<Person>> ludwig =
this.restTemplate.getForEntity("/composers/{id}", Person.class, 42);
// We are only validating the request. The response is mocked out.
// person.getName().equals("Ludwig van Beethoven")
// person.getDouble().equals(1.6035)
this.mockServer.verify();
}
@Override
public void waitForCalculationToFinish(ListenableFuture<String> convertedFuture) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
convertedFuture.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
latch.countDown();
}
});
latch.await(1, TimeUnit.SECONDS);
}
static Iterable<Function<AsyncRestOperations, ListenableFuture<User>>> execute() {
final ObjectMapper mapper = new ObjectMapper();
final AsyncRequestCallback callback = request -> {
request.getHeaders().add("Test", "true");
request.getHeaders().setContentType(MediaType.APPLICATION_JSON);
mapper.writeValue(request.getBody(), new User("D. Fault", "1984-09-13"));
};
final ResponseExtractor<User> extractor = response ->
mapper.readValue(response.getBody(), User.class);
return Arrays.asList(
unit -> unit.execute("/departments/{id}/users", POST, callback, extractor, 1),
unit -> unit.execute("/departments/{id}/users", POST, callback, extractor, singletonMap("id", 1)),
unit -> unit.execute(URI.create("/departments/1/users"), POST, callback, extractor)
);
}
/**
* Asynchronous query execution using callbacks.
*/
@Test
public void insertAsynchronously() throws InterruptedException {
User user = new User();
user.setId(42L);
user.setUsername("heisenberg");
user.setFirstname("Walter");
user.setLastname("White");
final CountDownLatch countDownLatch = new CountDownLatch(1);
AsyncCassandraTemplate asyncTemplate = new AsyncCassandraTemplate(session);
ListenableFuture<User> future = asyncTemplate.insert(user);
future.addCallback(it -> countDownLatch.countDown(), throwable -> countDownLatch.countDown());
countDownLatch.await(5, TimeUnit.SECONDS);
User loaded = template.selectOneById(user.getId(), User.class);
assertThat(loaded).isEqualTo(user);
}
@Override
@SuppressWarnings("deprecation")
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<WebSocketSession>();
XhrClientSockJsSession session = new XhrClientSockJsSession(request, handler, this, connectFuture);
request.addTimeoutTask(session.getTimeoutTask());
URI receiveUrl = request.getTransportUrl();
if (logger.isDebugEnabled()) {
logger.debug("Starting XHR " +
(isXhrStreamingDisabled() ? "Polling" : "Streaming") + "session url=" + receiveUrl);
}
HttpHeaders handshakeHeaders = new HttpHeaders();
handshakeHeaders.putAll(getRequestHeaders());
handshakeHeaders.putAll(request.getHandshakeHeaders());
connectInternal(request, handler, receiveUrl, handshakeHeaders, session, connectFuture);
return connectFuture;
}
@Override
protected void openConnection() {
if (logger.isInfoEnabled()) {
logger.info("Connecting to WebSocket at " + getUri());
}
ListenableFuture<WebSocketSession> future =
this.client.doHandshake(this.webSocketHandler, this.headers, getUri());
future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(WebSocketSession result) {
webSocketSession = result;
logger.info("Successfully connected");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Failed to connect", ex);
}
});
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
final byte[] body, final AsyncClientHttpRequestExecution execution)
throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
@Override
public ListenableFuture<ClientHttpResponse> apply(
final ServiceInstance instance) throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request,
instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
return execution.executeAsync(serviceRequest, body);
}
});
}
@Test
public void exchangePostCallback() throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set("MyHeader", "MyValue");
requestHeaders.setContentType(MediaType.TEXT_PLAIN);
HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld, requestHeaders);
ListenableFuture<ResponseEntity<Void>> resultFuture =
template.exchange(baseUrl + "/{method}", HttpMethod.POST, requestEntity, Void.class, "post");
final URI expected =new URI(baseUrl + "/post/1");
resultFuture.addCallback(new ListenableFutureCallback<ResponseEntity<Void>>() {
@Override
public void onSuccess(ResponseEntity<Void> result) {
assertEquals("Invalid location", expected, result.getHeaders().getLocation());
assertFalse(result.hasBody());
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(resultFuture);
}
@Test
public void asyncResultWithCallbackAndValue() throws Exception {
String value = "val";
final Set<String> values = new HashSet<>(1);
ListenableFuture<String> future = AsyncResult.forValue(value);
future.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
values.add(result);
}
@Override
public void onFailure(Throwable ex) {
fail("Failure callback not expected: " + ex);
}
});
assertSame(value, values.iterator().next());
assertSame(value, future.get());
assertSame(value, future.completable().get());
future.completable().thenAccept(v -> assertSame(value, v));
}
@Test
public void getsEndWithRequestVariables() throws Exception {
int result = restTemplate.getForObject(
controllerUrl + "add?a={a}&b={b}",
Integer.class,
3,
4);
assertThat(result, is(7));
ListenableFuture<ResponseEntity<Integer>> listenableFuture = asyncRestTemplate
.getForEntity(controllerUrl + "add?a={a}&b={b}",
Integer.class,
3,
4);
ResponseEntity<Integer> futureResponse = listenableFuture.get();
assertThat(futureResponse.getBody(), is(7));
}
@Override
public void accept(InboundDriverMessage driver) {
byte[] keyBytes = driver.getDriverId().getBytes(StandardCharsets.UTF_8);
ListenableFuture<SendResult<Object, Object>> future =
kafkaTemplate.send(topic, keyBytes, driver.toByteArray());
future.addCallback(
new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.warn("Failed sending an event to kafka", throwable);
}
@Override
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {}
});
}
@Override
public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body)
throws IOException {
if (this.iterator.hasNext()) {
AsyncClientHttpRequestInterceptor interceptor = this.iterator.next();
return interceptor.intercept(request, body, this);
}
else {
URI uri = request.getURI();
HttpMethod method = request.getMethod();
HttpHeaders headers = request.getHeaders();
AsyncClientHttpRequest delegate = requestFactory.createAsyncRequest(uri, method);
delegate.getHeaders().putAll(headers);
if (body.length > 0) {
StreamUtils.copy(body, delegate.getBody());
}
return delegate.executeAsync();
}
}
@Test
public void ableToConsumeTextPlain() throws Exception {
String body = "a=1";
String result = restTemplate.postForObject(
codeFirstUrl + "textPlain",
body,
String.class);
assertThat(jsonOf(result, String.class), is(body));
HttpEntity<?> entity = new HttpEntity<>(body);
ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate
.postForEntity(codeFirstUrl + "textPlain", entity, String.class);
ResponseEntity<String> responseEntity = listenableFuture.get();
assertThat(jsonOf(responseEntity.getBody(), String.class), is(body));
}
@Override
public ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Map<String, ?> uriVars)
throws RestClientException {
ResponseExtractor<HttpHeaders> extractor = headersExtractor();
ListenableFuture<HttpHeaders> future = execute(url, HttpMethod.OPTIONS, null, extractor, uriVars);
return adaptToAllowHeader(future);
}
private ModelAndView getResponse(ListenableFuture<Job> loadJob, String tableName) {
String message;
try {
Job job = loadJob.get();
message = "Successfully loaded data file to " + tableName;
}
catch (Exception e) {
e.printStackTrace();
message = "Error: " + e.getMessage();
}
return new ModelAndView("index")
.addObject("datasetName", this.datasetName)
.addObject("message", message);
}
@Override
@Nullable
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType);
Assert.state(handler instanceof AsyncHandlerMethodReturnValueHandler,
"AsyncHandlerMethodReturnValueHandler required");
return ((AsyncHandlerMethodReturnValueHandler) handler).toListenableFuture(returnValue, returnType);
}
@Test
public void verify() {
this.mockServer.expect(requestTo("/number")).andExpect(method(HttpMethod.GET))
.andRespond(withSuccess("1", MediaType.TEXT_PLAIN));
this.mockServer.expect(requestTo("/number")).andExpect(method(HttpMethod.GET))
.andRespond(withSuccess("2", MediaType.TEXT_PLAIN));
this.mockServer.expect(requestTo("/number")).andExpect(method(HttpMethod.GET))
.andRespond(withSuccess("4", MediaType.TEXT_PLAIN));
this.mockServer.expect(requestTo("/number")).andExpect(method(HttpMethod.GET))
.andRespond(withSuccess("8", MediaType.TEXT_PLAIN));
@SuppressWarnings("unused")
ListenableFuture<ResponseEntity<String>> result = this.restTemplate.getForEntity("/number", String.class);
// result == "1"
result = this.restTemplate.getForEntity("/number", String.class);
// result == "2"
try {
this.mockServer.verify();
}
catch (AssertionError error) {
assertTrue(error.getMessage(), error.getMessage().contains("2 unsatisfied expectation(s)"));
}
}
@Override
public <T> ListenableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> pullAndConvertAsync(String subscription,
Integer maxMessages, Boolean returnImmediately, Class<T> payloadType) {
final SettableListenableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> settableFuture = new SettableListenableFuture<>();
this.pullAsync(subscription, maxMessages, returnImmediately).addCallback(
ackableMessages -> settableFuture
.set(this.toConvertedAcknowledgeablePubsubMessages(payloadType, ackableMessages)),
settableFuture::setException);
return settableFuture;
}
@Test
public void qualifiedAsyncMethodsAreRoutedToCorrectExecutor() throws InterruptedException, ExecutionException {
DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();
beanFactory.registerBeanDefinition("e1", new RootBeanDefinition(ThreadPoolTaskExecutor.class));
AnnotationAsyncExecutionAspect.aspectOf().setBeanFactory(beanFactory);
ClassWithQualifiedAsyncMethods obj = new ClassWithQualifiedAsyncMethods();
Future<Thread> defaultThread = obj.defaultWork();
assertThat(defaultThread.get(), not(Thread.currentThread()));
assertThat(defaultThread.get().getName(), not(startsWith("e1-")));
ListenableFuture<Thread> e1Thread = obj.e1Work();
assertThat(e1Thread.get().getName(), startsWith("e1-"));
CompletableFuture<Thread> e1OtherThread = obj.e1OtherWork();
assertThat(e1OtherThread.get().getName(), startsWith("e1-"));
}
private static ListenableFuture<URI> adaptToLocationHeader(ListenableFuture<HttpHeaders> future) {
return new ListenableFutureAdapter<URI, HttpHeaders>(future) {
@Override
@Nullable
protected URI adapt(HttpHeaders headers) throws ExecutionException {
return headers.getLocation();
}
};
}
@Test
public void ensureServerWorksFine() throws Exception {
String result = restTemplate.getForObject(
controllerUrl + "sayhi?name=world",
String.class);
assertThat(jsonOf(result, String.class), is("hi world [world]"));
ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate
.getForEntity(controllerUrl + "sayhi?name=world",
String.class);
ResponseEntity<String> futureResponse = listenableFuture.get();
assertThat(jsonOf(futureResponse.getBody(), String.class), is("hi world [world]"));
}
/**
* AOP切点
* @author Frodez
* @date 2019-05-10
*/
@SuppressWarnings("unchecked")
@Override
public Advice getAdvice() {
return (MethodInterceptor) invocation -> {
Method method = invocation.getMethod();
String name = ReflectUtil.fullName(method);
if (method.getParameterCount() != 0) {
Parameter[] parameters = method.getParameters();
Object[] args = invocation.getArguments();
Map<String, Object> paramMap = new HashMap<>(parameters.length);
for (int i = 0; i < parameters.length; ++i) {
paramMap.put(parameters[i].getName(), args[i]);
}
log.info("{} 请求参数:{}", name, JSONUtil.string(paramMap));
} else {
log.info("{} 本方法无入参", name);
}
Object result = invocation.proceed();
if (method.getReturnType() != Void.class) {
if (result != null) {
log.info("{} 返回值:{}", name, JSONUtil.string(((ListenableFuture<Result>) result).get()));
} else {
log.info("{} 返回值:{}", name, JSONUtil.string(result));
}
} else {
log.info("{} 本方法返回值类型为void", name);
}
return result;
};
}
@Test
public void postForLocationCallbackWithLambdas() throws Exception {
HttpHeaders entityHeaders = new HttpHeaders();
entityHeaders.setContentType(new MediaType("text", "plain", StandardCharsets.ISO_8859_1));
HttpEntity<String> entity = new HttpEntity<>(helloWorld, entityHeaders);
final URI expected = new URI(baseUrl + "/post/1");
ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
locationFuture.addCallback(result -> assertEquals("Invalid location", expected, result),
ex -> fail(ex.getMessage()));
waitTillDone(locationFuture);
}
@Override
public <T> ListenableFuture<T> execute(URI url, HttpMethod method,
@Nullable AsyncRequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
return doExecute(url, method, requestCallback, responseExtractor);
}
@Override
public <T> ListenableFuture<T> execute(final String url, final HttpMethod method,
@Nullable final AsyncRequestCallback callback, @Nullable final ResponseExtractor<T> extractor,
final Object... uriVariables) {
final Capture<T> capture = Capture.empty();
return execute(url, method, toEntity(callback), route(extractTo(extractor, capture)), capture, uriVariables);
}
@Override
public ServiceCallResult getContainerLog(GetLogContainerArg arg) {
ServiceCallResult callResult = new ServiceCallResult();
final Consumer<ProcessEvent> watcher = firstNonNull(arg.getWatcher(), Consumers.<ProcessEvent>nop());
boolean stderr = arg.isStderr();
boolean stdout = arg.isStdout();
if (!stderr && !stdout) {
// we need at least one stream (but usually need both )
stderr = stdout = true;
}
URI url = getUrlContainer(arg.getId(), "logs")
.queryParam("stderr", stderr)
.queryParam("stdout", stdout)
.queryParam("follow", arg.isFollow())
.queryParam("since", arg.getSince())
.queryParam("tail", arg.getTail())
.queryParam("timestamps", arg.isTimestamps()).build().toUri();
try {
ListenableFuture<Object> future = restTemplate.execute(url, HttpMethod.GET, null, response -> {
StreamContext<ProcessEvent> context = new StreamContext<>(response.getBody(), watcher);
context.getInterrupter().setFuture(arg.getInterrupter());
frameStreamProcessor.processResponseStream(context);
return null;
});
waitFuture(callResult, future);
} catch (HttpStatusCodeException e) {
processStatusCodeException(e, callResult, url);
}
return callResult;
}
@Test
public void headForHeadersCallbackWithLambdas() throws Exception {
ListenableFuture<HttpHeaders> headersFuture = template.headForHeaders(baseUrl + "/get");
headersFuture.addCallback(result -> assertTrue("No Content-Type header",
result.containsKey("Content-Type")), ex -> fail(ex.getMessage()));
waitTillDone(headersFuture);
}
@Test
public void headForHeadersCallbackWithLambdas() throws Exception {
ListenableFuture<HttpHeaders> headersFuture = template.headForHeaders(baseUrl + "/get");
headersFuture.addCallback(result -> assertTrue("No Content-Type header",
result.containsKey("Content-Type")), ex -> fail(ex.getMessage()));
waitTillDone(headersFuture);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
try {
ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
doExecute(this.concurrentExecutor, this.taskDecorator, future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(
"Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
}
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
try {
ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
doExecute(this.concurrentExecutor, this.taskDecorator, future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(
"Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
}
}