下面列出了com.google.common.util.concurrent.ListenableFuture#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks
* until the upload of all BLOBs is complete, or throws an {@link Exception} after the first
* upload failed. Any other uploads will continue uploading in the background, until they complete
* or the {@link #shutdown()} method is called. Errors encountered by these uploads are swallowed.
*
* <p>Uploads are retried according to the specified {@link Retrier}. Retrying is transparent to
* the user of this API.
*
* <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being
* performed. This is transparent to the user of this API.
*
* @throws IOException when the upload failed due to content issues
*/
public void uploadBlobs(Map<HashCode, Chunker> chunkers)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = Lists.newArrayList();
for (Map.Entry<HashCode, Chunker> chunkerEntry : chunkers.entrySet()) {
uploads.add(uploadBlobAsync(chunkerEntry.getKey(), chunkerEntry.getValue()));
}
try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
propagateIfInstanceOf(cause, IOException.class);
throwIfUnchecked(cause);
throw new RuntimeException(cause);
}
}
@Test
public void unaryFutureCallFailed() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
Metadata trailers = new Metadata();
listener.get().onClose(Status.INTERNAL, trailers);
try {
future.get();
fail("Should fail");
} catch (ExecutionException e) {
Status status = Status.fromThrowable(e);
assertEquals(Status.INTERNAL, status);
Metadata metadata = Status.trailersFromThrowable(e);
assertSame(trailers, metadata);
}
}
@Test
public void testSendWithInvalidAccessKeySecret() throws InterruptedException, ProducerException {
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(buildInvalidAccessKeySecretProjectConfig());
ListenableFuture<Result> f =
producer.send(System.getenv("PROJECT"), System.getenv("LOG_STORE"), buildLogItem());
try {
f.get();
} catch (ExecutionException e) {
ResultFailedException resultFailedException = (ResultFailedException) e.getCause();
Result result = resultFailedException.getResult();
Assert.assertFalse(result.isSuccessful());
Assert.assertEquals("SignatureNotMatch", result.getErrorCode());
Assert.assertTrue(!result.getErrorMessage().isEmpty());
List<Attempt> attempts = result.getReservedAttempts();
Assert.assertEquals(1, attempts.size());
for (Attempt attempt : attempts) {
Assert.assertFalse(attempt.isSuccess());
Assert.assertEquals("SignatureNotMatch", attempt.getErrorCode());
Assert.assertTrue(!attempt.getErrorMessage().isEmpty());
Assert.assertTrue(!attempt.getRequestId().isEmpty());
}
}
}
@Test
public void testUnmapFails() throws IOException, InterruptedException, ExecutionException {
Set<Membership> members =
Collections.singleton(
new Membership().setPreferredMemberKey(new EntityKey().setId("[email protected]")));
IdentityGroup group =
new IdentityGroup.Builder()
.setGroupIdentity("domain\\group1")
.setGroupKey(GROUP_KEY)
.setMembers(members)
.setGroupResourceName("groups/id1")
.build();
when(mockIdentityService.deleteGroup("groups/id1"))
.thenReturn(Futures.immediateFailedFuture(new IOException("error deleting group")));
ListenableFuture<Boolean> unmap = group.unmap(mockIdentityService);
thrown.expectCause(isA(IOException.class));
unmap.get();
}
@Override
public JsonObject visit(DeleteOperation operation) throws Throwable {
MobileServiceJsonTable table = this.getRemoteTable(operation.getTableName());
ListenableFuture<Void> future = table.delete(this.mItem);
try {
future.get();
return null;
} catch (ExecutionException ex) {
if (!ExceptionIs404NotFound(ex)) {
throw ex.getCause();
}
return null;
}
}
/**
* You can convert an Observable to a ListenableFuture.
* ListenableFuture (part of google guava library) is a popular extension
* of Java's Future which allows registering listener callbacks:
* https://github.com/google/guava/wiki/ListenableFutureExplained
*/
@Test(groups = "samples", timeOut = TIMEOUT)
public void transformObservableToGoogleGuavaListenableFuture() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
Observable<FeedResponse<Document>> documentQueryObservable = client
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
// Convert to observable of list of pages
Observable<List<FeedResponse<Document>>> allPagesObservable = documentQueryObservable.toList();
// Convert the observable of list of pages to a Future
ListenableFuture<List<FeedResponse<Document>>> future = ListenableFutureObservable.to(allPagesObservable);
List<FeedResponse<Document>> pageList = future.get();
int totalNumberOfRetrievedDocuments = 0;
for (FeedResponse<Document> page : pageList) {
totalNumberOfRetrievedDocuments += page.getResults().size();
}
assertThat(numberOfDocuments, equalTo(totalNumberOfRetrievedDocuments));
}
private void updateUrl(ListenableFuture<String[][]> future, String suffix, boolean isHttps) {
String[][] addresses;
try {
addresses = future.get();
if (addresses == null) return;
} catch (Exception e) {
logger.info("从entrypointv2接口获取http链接失败!", e);
return;
}
String[] urls = genUrls(suffix, addresses, isHttps);
if (isHttps) {
httpsUrls.put(suffix, urls);
} else {
httpUrls.put(suffix, urls);
}
}
/**
* Executes a QueryMessage and returns the resulting tuples
*
* @return an empty ArrayList if an error happens. Else just the list of rows
*/
private List<Map<String, PrimitiveTypeProvider>> executeQuery(QueryMessage qm) {
ListenableFuture<QueryResultsMessage> f = this.adampro.standardQuery(qm);
QueryResultsMessage result;
try {
result = f.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error(LogHelper.getStackTrace(e));
return new ArrayList<>(0);
}
if (result.getAck().getCode() != AckMessage.Code.OK) {
LOGGER.error("Query returned non-OK result code {} with message: {}",
result.getAck().getCode(),
result.getAck().getMessage());
}
if (result.getResponsesCount() == 0) {
return new ArrayList<>(0);
}
QueryResultInfoMessage response = result.getResponses(0); // only head (end-result) is important
List<QueryResultTupleMessage> resultList = response.getResultsList();
return resultsToMap(resultList);
}
@Test
public void overflowMeterTest() throws Exception {
when(inner.post(anyString(), anyString(), anyMap())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return SettableFuture.<Integer>create();
}
});
outer.post(url, "foo", json);
outer.post(url, "foo", json2);
outer.post(url, "foo", json3);
ListenableFuture<Integer> overflow = outer.post(url, "foo", json4);
assert (overflow.isDone());
exception.expect(ExecutionException.class);
overflow.get();
}
private void terminateSubscription(final String subscriptionId, final RefundType refund) throws Exception {
logger.debug("Call terminate subscription: {} with refund: {}", subscriptionId, refund);
ListenableFuture<Subscription> future = billingClient.terminateSubscription(subscriptionId, refund);
try {
future.get(billingTimeout, TimeUnit.SECONDS);
} catch(Exception e) {
if(refund == RefundType.NONE) {
throw e;
}
if (e.getCause() instanceof RecurlyAPIErrorException) {
logger.debug("Recurly API Error Received: {}", ((RecurlyAPIErrorException)e.getCause()).getErrors());
RecurlyErrors errs = ((RecurlyAPIErrorException)e.getCause()).getErrors();
if(errs.stream().anyMatch((re) -> { return re.getErrorSymbol().equals("refund_invalid") || re.getErrorSymbol().equals("invalid_transaction"); })) {
logger.info("recurly reported refund_invalid or invalid_transaction - previously refunded {} refunding {}, retrying with no refund", refund, subscriptionId);
terminateSubscription(subscriptionId, RefundType.NONE);
} else {
throw e;
}
} else {
throw e;
}
}
}
private <T> List<T> execute(
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
// 获得一个sql语句执行单元
BaseStatementUnit firstInput = iterator.next();
// 异步多线程去执行->
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try {
// 同步执行->
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
// 获取执行结果
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
ExecutorExceptionHandler.handleException(ex);
return null;
}
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
@Test public void createFutureSingletonSet_failure() throws Exception {
ListenableFuture<String> future = Futures.immediateFailedFuture(new RuntimeException("monkey"));
ListenableFuture<Set<String>> setFuture = Producers.createFutureSingletonSet(future);
assertThat(setFuture.isDone()).isTrue();
try {
setFuture.get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause()).hasMessage("monkey");
}
}
@Test(timeOut = 10_000)
public void testFullQueue()
throws Exception
{
AsyncQueue<String> queue = new AsyncQueue<>(4, executor);
assertTrue(queue.offer("1").isDone());
assertTrue(queue.offer("2").isDone());
assertTrue(queue.offer("3").isDone());
assertFalse(queue.offer("4").isDone());
assertFalse(queue.offer("5").isDone());
ListenableFuture<?> offerFuture = queue.offer("6");
assertFalse(offerFuture.isDone());
assertEquals(queue.getBatchAsync(2).get(), ImmutableList.of("1", "2"));
assertFalse(offerFuture.isDone());
assertEquals(queue.getBatchAsync(1).get(), ImmutableList.of("3"));
offerFuture.get();
offerFuture = queue.offer("7");
assertFalse(offerFuture.isDone());
queue.finish();
offerFuture.get();
assertFalse(queue.isFinished());
assertEquals(queue.getBatchAsync(4).get(), ImmutableList.of("4", "5", "6", "7"));
assertTrue(queue.isFinished());
}
@Override
public AbstractGtpBasedAnalyzer build() {
if (!gtpClient.isRunning()) {
gtpClient.start();
}
// Check for engine ready
ListenableFuture<List<String>> future = gtpClient.postCommand("name");
List<String> nameResponse = null;
try {
nameResponse = future.get(60, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException | InterruptedException e) {
// Do nothing
}
if (!GtpCommand.isSuccessfulResponse(nameResponse)) {
throw new GenericLizzieException(ImmutableMap.of(REASON, ENGINE_NOT_FUNCTION));
}
String name = GtpCommand.getLineWithoutResponseHeader(nameResponse, 0).trim();
if (name.equals("Leela Zero")) {
int leelazEngineVersion = getLeelazEngineVersion();
if (leelazEngineVersion == 2) {
return new OfficialLeelazAnalyzerV2(gtpClient);
} else if (leelazEngineVersion == 1) {
return new OfficialLeelazAnalyzerV1(gtpClient);
} else {
detectCorrectModifiedLeelazEngine();
return new ClassicModifiedLeelazAnalyzer(gtpClient);
}
} else if (name.equals("Leela Zero Phoenix")) {
return new PhoenixGoAnalyzer(gtpClient);
} else {
throw new GenericLizzieException(ImmutableMap.of(REASON, ENGINE_NOT_SUPPORTED));
}
}
/**
* Populates the topic tree with all information from the ClientSessionPersistence
*/
private void populateTopicTree() {
final ListenableFuture<Set<String>> clientsFuture = clientSessionPersistence.getAllClients();
// Blocking. The TopicTreeStartup needs to be done before new connections are allowed.
try {
final Set<String> clients = clientsFuture.get();
for (final String client : clients) {
final Set<Topic> clientSubscriptions = clientSessionSubscriptionPersistence.getSubscriptions(client);
final ClientSession session = clientSessionPersistence.getSession(client, false);
if (session == null || session.getSessionExpiryInterval() == SESSION_EXPIRE_ON_DISCONNECT) {
// We don't have to remove the subscription from the topic tree, since it is not added to the topic tree yet.
clientSessionSubscriptionPersistence.removeAllLocally(client);
continue;
}
for (final Topic topic : clientSubscriptions) {
final SharedSubscription sharedSubscription = sharedSubscriptionService.checkForSharedSubscription(topic.getTopic());
if (sharedSubscription == null) {
topicTree.addTopic(client, topic, SubscriptionFlags.getDefaultFlags(false, topic.isRetainAsPublished(), topic.isNoLocal()), null);
} else {
topicTree.addTopic(client, new Topic(sharedSubscription.getTopicFilter(), topic.getQoS(), topic.isNoLocal(), topic.isRetainAsPublished()), SubscriptionFlags.getDefaultFlags(true, topic.isRetainAsPublished(), topic.isNoLocal()), sharedSubscription.getShareName());
}
}
}
} catch (final Exception ex) {
log.error("Failed to bootstrap topic tree.", ex);
}
}
@Test
public void testSendWithRequestError2() throws InterruptedException, ProducerException {
ProducerConfig producerConfig = new ProducerConfig();
int retries = 5;
int maxReservedAttempts = 2;
producerConfig.setRetries(retries);
producerConfig.setMaxReservedAttempts(maxReservedAttempts);
Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(buildProjectConfig());
ListenableFuture<Result> f = producer.send("project", "logStore", ProducerTest.buildLogItem());
try {
f.get();
} catch (ExecutionException e) {
ResultFailedException resultFailedException = (ResultFailedException) e.getCause();
Result result = resultFailedException.getResult();
Assert.assertFalse(result.isSuccessful());
Assert.assertEquals("RequestError", result.getErrorCode());
Assert.assertTrue(
result.getErrorMessage().startsWith("Web request failed: project.endpoint"));
List<Attempt> attempts = result.getReservedAttempts();
Assert.assertEquals(maxReservedAttempts, attempts.size());
Assert.assertEquals(retries + 1, result.getAttemptCount());
for (Attempt attempt : attempts) {
Assert.assertFalse(attempt.isSuccess());
Assert.assertEquals("RequestError", attempt.getErrorCode());
Assert.assertTrue(
attempt.getErrorMessage().startsWith("Web request failed: project.endpoint"));
Assert.assertEquals("", attempt.getRequestId());
}
}
producer.close();
ProducerTest.assertProducerFinalState(producer);
}
public InsertStatus insertBlocking(InsertMessage message) {
ListenableFuture<InsertStatus> future = this.insert(message);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("error in insertBlocking: {}", LogHelper.getStackTrace(e));
return INTERRUPTED_INSERT;
}
}
private void doPurgeRow(Date purgeTime, int partitionId) throws Exception {
List<ListenableFuture<Boolean>> purgeResults = new LinkedList<>();
videoPurgeDao.listPurgeableRecordings(purgeTime, partitionId).forEach(r -> {
purgeResults.add(exec.submit(() -> { return doPurgeRecording(purgeTime, partitionId, r); }));
});
try {
// Wait for all of the submitted tasks to complete. If any of them fail an exception
// will be thrown and the code below that clears out the metadata for the row will not
// execute. This allows the purge to be attempted again in the future.
ListenableFuture<List<Boolean>> results = Futures.allAsList(purgeResults);
List<Boolean> res = results.get();
if (!res.stream().allMatch((b) -> b)) {
log.debug("did not purge all recordings from: date={}, partition={} (will attempt again in future)", purgeTime, partitionId);
return;
}
if (purgeConfig.isPurgeDryRun()) {
log.debug("purged {} recordings from: date={}, partition={} (dryrun)", res.size(), purgeTime, partitionId);
} else {
videoPurgeDao.deletePurgeableRow(purgeTime, partitionId);
log.debug("purged {} recordings from: date={}, partition={}", res.size(), purgeTime, partitionId);
}
} catch (Exception ex) {
log.debug("failed to purge some recordings from: date={}, partition={} (will attempt again in future)", purgeTime, partitionId);
}
}
@Override
public List<float[]> getFeatureVectors(String fieldName, String value, String vectorName) {
QueryMessage qbqm = this.mb.buildQueryMessage(ADAMproMessageBuilder.DEFAULT_HINT, this.fromMessage, this.mb.buildBooleanQueryMessage(this.mb.buildWhereMessage(fieldName, value)), null, null);
ListenableFuture<QueryResultsMessage> f = this.adampro.booleanQuery(qbqm);
ArrayList<float[]> _return = new ArrayList<>();
QueryResultsMessage r;
try {
r = f.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error(LogHelper.getStackTrace(e));
return new ArrayList<>(0);
}
if (r.getResponsesCount() == 0) {
return new ArrayList<>(0);
}
QueryResultInfoMessage response = r.getResponses(0); // only head (end-result) is important
AckMessage ack = response.getAck();
if (ack.getCode() != Code.OK) {
LOGGER.error("error in getFeatureVectors on entity {}, ({}) : {}", entityName, ack.getCode(), ack.getMessage());
return _return;
}
for (QueryResultTupleMessage result : response.getResultsList()) {
Map<String, DataMessage> data = result.getDataMap();
if (!data.containsKey(vectorName)) {
continue;
}
DataMessage dm = data.get(vectorName);
if (dm.getDatatypeCase() != DataMessage.DatatypeCase.VECTORDATA) {
continue;
}
VectorMessage featureData = dm.getVectorData();
if (featureData.getVectorCase() != VectorMessage.VectorCase.DENSEVECTOR) {
continue; // TODO add correct handling for sparse and int vectors
}
DenseVectorMessage dense = featureData.getDenseVector();
List<Float> list = dense.getVectorList();
if (list.isEmpty()) {
continue;
}
float[] vector = new float[list.size()];
int i = 0;
for (float x : list) {
vector[i++] = x;
}
_return.add(vector);
}
return _return;
}
@Test(timeOut = 10_000)
public void testBorrowThrows()
throws Exception
{
// It doesn't matter the exact behavior when the caller-supplied function to borrow fails.
// However, it must not block pending futures.
AsyncQueue<Integer> queue = new ThrottledAsyncQueue<>(100, 4, executor);
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);
ListenableFuture<?> future1 = queue.offer(6);
assertFalse(future1.isDone());
Runnable runnable = () -> {
getFutureValue(queue.borrowBatchAsync(1, elements -> {
throw new RuntimeException("test fail");
}));
};
assertThatThrownBy(() -> executor.submit(runnable).get())
.isInstanceOf(ExecutionException.class)
.hasMessageContaining("test fail");
ListenableFuture<?> future2 = queue.offer(7);
assertFalse(future1.isDone());
assertFalse(future2.isDone());
queue.finish();
future1.get();
future2.get();
assertTrue(queue.offer(8).isDone());
assertThatThrownBy(() -> executor.submit(runnable).get())
.isInstanceOf(ExecutionException.class)
.hasMessageContaining("test fail");
assertTrue(queue.offer(9).isDone());
assertFalse(queue.isFinished());
// 1 and 2 were removed by borrow call; 8 and 9 were never inserted because insertion happened after finish.
assertEquals(queue.getBatchAsync(100).get(), ImmutableList.of(3, 4, 5, 6, 7));
assertTrue(queue.isFinished());
}