类org.springframework.util.concurrent.ListenableFuture源码实例Demo

下面列出了怎么用org.springframework.util.concurrent.ListenableFuture的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: spring-analysis-note   文件: SampleAsyncTests.java
@Test
public void performGet() throws Exception {

	String responseBody = "{\"name\" : \"Ludwig van Beethoven\", \"someDouble\" : \"1.6035\"}";

	this.mockServer.expect(requestTo("/composers/42")).andExpect(method(HttpMethod.GET))
		.andRespond(withSuccess(responseBody, MediaType.APPLICATION_JSON));

	@SuppressWarnings("unused")
	ListenableFuture<ResponseEntity<Person>> ludwig =
			this.restTemplate.getForEntity("/composers/{id}", Person.class, 42);

	// We are only validating the request. The response is mocked out.
	// person.getName().equals("Ludwig van Beethoven")
	// person.getDouble().equals(1.6035)

	this.mockServer.verify();
}
 
@Override
public void waitForCalculationToFinish(ListenableFuture<String> convertedFuture) throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(1);
    convertedFuture.addCallback(new ListenableFutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            latch.countDown();
        }

        @Override
        public void onFailure(Throwable t) {
            latch.countDown();
        }
    });
    latch.await(1, TimeUnit.SECONDS);
}
 
源代码3 项目: riptide   文件: AsyncHttpOperationsTest.java
static Iterable<Function<AsyncRestOperations, ListenableFuture<User>>> execute() {
    final ObjectMapper mapper = new ObjectMapper();

    final AsyncRequestCallback callback = request -> {
        request.getHeaders().add("Test", "true");
        request.getHeaders().setContentType(MediaType.APPLICATION_JSON);
        mapper.writeValue(request.getBody(), new User("D. Fault", "1984-09-13"));
    };

    final ResponseExtractor<User> extractor = response ->
            mapper.readValue(response.getBody(), User.class);

    return Arrays.asList(
            unit -> unit.execute("/departments/{id}/users", POST, callback, extractor, 1),
            unit -> unit.execute("/departments/{id}/users", POST, callback, extractor, singletonMap("id", 1)),
            unit -> unit.execute(URI.create("/departments/1/users"), POST, callback, extractor)
    );
}
 
/**
 * Asynchronous query execution using callbacks.
 */
@Test
public void insertAsynchronously() throws InterruptedException {

	User user = new User();
	user.setId(42L);
	user.setUsername("heisenberg");
	user.setFirstname("Walter");
	user.setLastname("White");

	final CountDownLatch countDownLatch = new CountDownLatch(1);

	AsyncCassandraTemplate asyncTemplate = new AsyncCassandraTemplate(session);

	ListenableFuture<User> future = asyncTemplate.insert(user);

	future.addCallback(it -> countDownLatch.countDown(), throwable -> countDownLatch.countDown());

	countDownLatch.await(5, TimeUnit.SECONDS);

	User loaded = template.selectOneById(user.getId(), User.class);
	assertThat(loaded).isEqualTo(user);
}
 
@Override
@SuppressWarnings("deprecation")
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
	SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<WebSocketSession>();
	XhrClientSockJsSession session = new XhrClientSockJsSession(request, handler, this, connectFuture);
	request.addTimeoutTask(session.getTimeoutTask());

	URI receiveUrl = request.getTransportUrl();
	if (logger.isDebugEnabled()) {
		logger.debug("Starting XHR " +
				(isXhrStreamingDisabled() ? "Polling" : "Streaming") + "session url=" + receiveUrl);
	}

	HttpHeaders handshakeHeaders = new HttpHeaders();
	handshakeHeaders.putAll(getRequestHeaders());
	handshakeHeaders.putAll(request.getHandshakeHeaders());

	connectInternal(request, handler, receiveUrl, handshakeHeaders, session, connectFuture);
	return connectFuture;
}
 
@Override
protected void openConnection() {
	if (logger.isInfoEnabled()) {
		logger.info("Connecting to WebSocket at " + getUri());
	}

	ListenableFuture<WebSocketSession> future =
			this.client.doHandshake(this.webSocketHandler, this.headers, getUri());

	future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
		@Override
		public void onSuccess(WebSocketSession result) {
			webSocketSession = result;
			logger.info("Successfully connected");
		}
		@Override
		public void onFailure(Throwable ex) {
			logger.error("Failed to connect", ex);
		}
	});
}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
		final byte[] body, final AsyncClientHttpRequestExecution execution)
		throws IOException {
	final URI originalUri = request.getURI();
	String serviceName = originalUri.getHost();
	return this.loadBalancer.execute(serviceName,
			new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
				@Override
				public ListenableFuture<ClientHttpResponse> apply(
						final ServiceInstance instance) throws Exception {
					HttpRequest serviceRequest = new ServiceRequestWrapper(request,
							instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
					return execution.executeAsync(serviceRequest, body);
				}

			});
}
 
@Test
public void exchangePostCallback() throws Exception {
	HttpHeaders requestHeaders = new HttpHeaders();
	requestHeaders.set("MyHeader", "MyValue");
	requestHeaders.setContentType(MediaType.TEXT_PLAIN);
	HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld, requestHeaders);
	ListenableFuture<ResponseEntity<Void>> resultFuture =
			template.exchange(baseUrl + "/{method}", HttpMethod.POST, requestEntity, Void.class, "post");
	final URI expected =new URI(baseUrl + "/post/1");
	resultFuture.addCallback(new ListenableFutureCallback<ResponseEntity<Void>>() {
		@Override
		public void onSuccess(ResponseEntity<Void> result) {
			assertEquals("Invalid location", expected, result.getHeaders().getLocation());
			assertFalse(result.hasBody());
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(resultFuture);
}
 
源代码9 项目: java-technology-stack   文件: AsyncResultTests.java
@Test
public void asyncResultWithCallbackAndValue() throws Exception {
	String value = "val";
	final Set<String> values = new HashSet<>(1);
	ListenableFuture<String> future = AsyncResult.forValue(value);
	future.addCallback(new ListenableFutureCallback<String>() {
		@Override
		public void onSuccess(String result) {
			values.add(result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail("Failure callback not expected: " + ex);
		}
	});
	assertSame(value, values.iterator().next());
	assertSame(value, future.get());
	assertSame(value, future.completable().get());
	future.completable().thenAccept(v -> assertSame(value, v));
}
 
@Test
public void getsEndWithRequestVariables() throws Exception {
  int result = restTemplate.getForObject(
      controllerUrl + "add?a={a}&b={b}",
      Integer.class,
      3,
      4);

  assertThat(result, is(7));
  ListenableFuture<ResponseEntity<Integer>> listenableFuture = asyncRestTemplate
      .getForEntity(controllerUrl + "add?a={a}&b={b}",
          Integer.class,
          3,
          4);
  ResponseEntity<Integer> futureResponse = listenableFuture.get();
  assertThat(futureResponse.getBody(), is(7));
}
 
源代码11 项目: stateful-functions   文件: KafkaDriverPublisher.java
@Override
public void accept(InboundDriverMessage driver) {
  byte[] keyBytes = driver.getDriverId().getBytes(StandardCharsets.UTF_8);
  ListenableFuture<SendResult<Object, Object>> future =
      kafkaTemplate.send(topic, keyBytes, driver.toByteArray());

  future.addCallback(
      new ListenableFutureCallback<SendResult<Object, Object>>() {
        @Override
        public void onFailure(Throwable throwable) {
          log.warn("Failed sending an event to kafka", throwable);
        }

        @Override
        public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {}
      });
}
 
源代码12 项目: lams   文件: InterceptingAsyncClientHttpRequest.java
@Override
public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body)
		throws IOException {

	if (this.iterator.hasNext()) {
		AsyncClientHttpRequestInterceptor interceptor = this.iterator.next();
		return interceptor.intercept(request, body, this);
	}
	else {
		URI uri = request.getURI();
		HttpMethod method = request.getMethod();
		HttpHeaders headers = request.getHeaders();

		AsyncClientHttpRequest delegate = requestFactory.createAsyncRequest(uri, method);
		delegate.getHeaders().putAll(headers);
		if (body.length > 0) {
			StreamUtils.copy(body, delegate.getBody());
		}

		return delegate.executeAsync();
	}
}
 
@Test
public void ableToConsumeTextPlain() throws Exception {
  String body = "a=1";
  String result = restTemplate.postForObject(
      codeFirstUrl + "textPlain",
      body,
      String.class);

  assertThat(jsonOf(result, String.class), is(body));

  HttpEntity<?> entity = new HttpEntity<>(body);
  ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate
      .postForEntity(codeFirstUrl + "textPlain", entity, String.class);
  ResponseEntity<String> responseEntity = listenableFuture.get();
  assertThat(jsonOf(responseEntity.getBody(), String.class), is(body));
}
 
源代码14 项目: spring-analysis-note   文件: AsyncRestTemplate.java
@Override
public ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Map<String, ?> uriVars)
		throws RestClientException {

	ResponseExtractor<HttpHeaders> extractor = headersExtractor();
	ListenableFuture<HttpHeaders> future = execute(url, HttpMethod.OPTIONS, null, extractor, uriVars);
	return adaptToAllowHeader(future);
}
 
源代码15 项目: spring-cloud-gcp   文件: WebController.java
private ModelAndView getResponse(ListenableFuture<Job> loadJob, String tableName) {
	String message;
	try {
		Job job = loadJob.get();
		message = "Successfully loaded data file to " + tableName;
	}
	catch (Exception e) {
		e.printStackTrace();
		message = "Error: " + e.getMessage();
	}

	return new ModelAndView("index")
			.addObject("datasetName", this.datasetName)
			.addObject("message", message);
}
 
@Override
@Nullable
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
	HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType);
	Assert.state(handler instanceof AsyncHandlerMethodReturnValueHandler,
			"AsyncHandlerMethodReturnValueHandler required");
	return ((AsyncHandlerMethodReturnValueHandler) handler).toListenableFuture(returnValue, returnType);
}
 
源代码17 项目: spring-analysis-note   文件: SampleAsyncTests.java
@Test
public void verify() {

	this.mockServer.expect(requestTo("/number")).andExpect(method(HttpMethod.GET))
		.andRespond(withSuccess("1", MediaType.TEXT_PLAIN));

	this.mockServer.expect(requestTo("/number")).andExpect(method(HttpMethod.GET))
		.andRespond(withSuccess("2", MediaType.TEXT_PLAIN));

	this.mockServer.expect(requestTo("/number")).andExpect(method(HttpMethod.GET))
		.andRespond(withSuccess("4", MediaType.TEXT_PLAIN));

	this.mockServer.expect(requestTo("/number")).andExpect(method(HttpMethod.GET))
		.andRespond(withSuccess("8", MediaType.TEXT_PLAIN));

	@SuppressWarnings("unused")
	ListenableFuture<ResponseEntity<String>> result = this.restTemplate.getForEntity("/number", String.class);
	// result == "1"

	result = this.restTemplate.getForEntity("/number", String.class);
	// result == "2"

	try {
		this.mockServer.verify();
	}
	catch (AssertionError error) {
		assertTrue(error.getMessage(), error.getMessage().contains("2 unsatisfied expectation(s)"));
	}
}
 
@Override
public <T> ListenableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> pullAndConvertAsync(String subscription,
		Integer maxMessages, Boolean returnImmediately, Class<T> payloadType) {
	final SettableListenableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> settableFuture = new SettableListenableFuture<>();

	this.pullAsync(subscription, maxMessages, returnImmediately).addCallback(
			ackableMessages -> settableFuture
					.set(this.toConvertedAcknowledgeablePubsubMessages(payloadType, ackableMessages)),
			settableFuture::setException);

	return settableFuture;
}
 
@Test
public void qualifiedAsyncMethodsAreRoutedToCorrectExecutor() throws InterruptedException, ExecutionException {
	DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();
	beanFactory.registerBeanDefinition("e1", new RootBeanDefinition(ThreadPoolTaskExecutor.class));
	AnnotationAsyncExecutionAspect.aspectOf().setBeanFactory(beanFactory);

	ClassWithQualifiedAsyncMethods obj = new ClassWithQualifiedAsyncMethods();

	Future<Thread> defaultThread = obj.defaultWork();
	assertThat(defaultThread.get(), not(Thread.currentThread()));
	assertThat(defaultThread.get().getName(), not(startsWith("e1-")));

	ListenableFuture<Thread> e1Thread = obj.e1Work();
	assertThat(e1Thread.get().getName(), startsWith("e1-"));

	CompletableFuture<Thread> e1OtherThread = obj.e1OtherWork();
	assertThat(e1OtherThread.get().getName(), startsWith("e1-"));
}
 
源代码20 项目: java-technology-stack   文件: AsyncRestTemplate.java
private static ListenableFuture<URI> adaptToLocationHeader(ListenableFuture<HttpHeaders> future) {
	return new ListenableFutureAdapter<URI, HttpHeaders>(future) {
		@Override
		@Nullable
		protected URI adapt(HttpHeaders headers) throws ExecutionException {
			return headers.getLocation();
		}
	};
}
 
@Test
public void ensureServerWorksFine() throws Exception {
  String result = restTemplate.getForObject(
      controllerUrl + "sayhi?name=world",
      String.class);

  assertThat(jsonOf(result, String.class), is("hi world [world]"));
  ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate
      .getForEntity(controllerUrl + "sayhi?name=world",
          String.class);
  ResponseEntity<String> futureResponse = listenableFuture.get();
  assertThat(jsonOf(futureResponse.getBody(), String.class), is("hi world [world]"));
}
 
源代码22 项目: BlogManagePlatform   文件: AsyncMethodLogAdvisor.java
/**
 * AOP切点
 * @author Frodez
 * @date 2019-05-10
 */
@SuppressWarnings("unchecked")
@Override
public Advice getAdvice() {
	return (MethodInterceptor) invocation -> {
		Method method = invocation.getMethod();
		String name = ReflectUtil.fullName(method);
		if (method.getParameterCount() != 0) {
			Parameter[] parameters = method.getParameters();
			Object[] args = invocation.getArguments();
			Map<String, Object> paramMap = new HashMap<>(parameters.length);
			for (int i = 0; i < parameters.length; ++i) {
				paramMap.put(parameters[i].getName(), args[i]);
			}
			log.info("{} 请求参数:{}", name, JSONUtil.string(paramMap));
		} else {
			log.info("{} 本方法无入参", name);
		}
		Object result = invocation.proceed();
		if (method.getReturnType() != Void.class) {
			if (result != null) {
				log.info("{} 返回值:{}", name, JSONUtil.string(((ListenableFuture<Result>) result).get()));
			} else {
				log.info("{} 返回值:{}", name, JSONUtil.string(result));
			}
		} else {
			log.info("{} 本方法返回值类型为void", name);
		}
		return result;
	};
}
 
@Test
public void postForLocationCallbackWithLambdas() throws Exception  {
	HttpHeaders entityHeaders = new HttpHeaders();
	entityHeaders.setContentType(new MediaType("text", "plain", StandardCharsets.ISO_8859_1));
	HttpEntity<String> entity = new HttpEntity<>(helloWorld, entityHeaders);
	final URI expected = new URI(baseUrl + "/post/1");
	ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
	locationFuture.addCallback(result -> assertEquals("Invalid location", expected, result),
			ex -> fail(ex.getMessage()));
	waitTillDone(locationFuture);
}
 
源代码24 项目: java-technology-stack   文件: AsyncRestTemplate.java
@Override
public <T> ListenableFuture<T> execute(URI url, HttpMethod method,
		@Nullable AsyncRequestCallback requestCallback,
		@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

	return doExecute(url, method, requestCallback, responseExtractor);
}
 
源代码25 项目: riptide   文件: AsyncHttpOperations.java
@Override
public <T> ListenableFuture<T> execute(final String url, final HttpMethod method,
        @Nullable final AsyncRequestCallback callback, @Nullable final ResponseExtractor<T> extractor,
        final Object... uriVariables) {

    final Capture<T> capture = Capture.empty();
    return execute(url, method, toEntity(callback), route(extractTo(extractor, capture)), capture, uriVariables);
}
 
源代码26 项目: haven-platform   文件: DockerServiceImpl.java
@Override
public ServiceCallResult getContainerLog(GetLogContainerArg arg) {

    ServiceCallResult callResult = new ServiceCallResult();

    final Consumer<ProcessEvent> watcher = firstNonNull(arg.getWatcher(), Consumers.<ProcessEvent>nop());
    boolean stderr = arg.isStderr();
    boolean stdout = arg.isStdout();
    if (!stderr && !stdout) {
        // we need at least one stream (but usually need both )
        stderr = stdout = true;
    }
    URI url = getUrlContainer(arg.getId(), "logs")
            .queryParam("stderr", stderr)
            .queryParam("stdout", stdout)
            .queryParam("follow", arg.isFollow())
            .queryParam("since", arg.getSince())
            .queryParam("tail", arg.getTail())
            .queryParam("timestamps", arg.isTimestamps()).build().toUri();
    try {
        ListenableFuture<Object> future = restTemplate.execute(url, HttpMethod.GET, null, response -> {
            StreamContext<ProcessEvent> context = new StreamContext<>(response.getBody(), watcher);
            context.getInterrupter().setFuture(arg.getInterrupter());
            frameStreamProcessor.processResponseStream(context);
            return null;
        });
        waitFuture(callResult, future);
    } catch (HttpStatusCodeException e) {
        processStatusCodeException(e, callResult, url);
    }
    return callResult;
}
 
@Test
public void headForHeadersCallbackWithLambdas() throws Exception {
	ListenableFuture<HttpHeaders> headersFuture = template.headForHeaders(baseUrl + "/get");
	headersFuture.addCallback(result -> assertTrue("No Content-Type header",
			result.containsKey("Content-Type")), ex -> fail(ex.getMessage()));
	waitTillDone(headersFuture);
}
 
@Test
public void headForHeadersCallbackWithLambdas() throws Exception {
	ListenableFuture<HttpHeaders> headersFuture = template.headForHeaders(baseUrl + "/get");
	headersFuture.addCallback(result -> assertTrue("No Content-Type header",
			result.containsKey("Content-Type")), ex -> fail(ex.getMessage()));
	waitTillDone(headersFuture);
}
 
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
	try {
		ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
		doExecute(this.concurrentExecutor, this.taskDecorator, future);
		return future;
	}
	catch (RejectedExecutionException ex) {
		throw new TaskRejectedException(
				"Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
	}
}
 
源代码30 项目: spring-analysis-note   文件: TaskExecutorAdapter.java
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
	try {
		ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
		doExecute(this.concurrentExecutor, this.taskDecorator, future);
		return future;
	}
	catch (RejectedExecutionException ex) {
		throw new TaskRejectedException(
				"Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
	}
}
 
 类所在包
 同包方法