下面列出了怎么用com.google.common.util.concurrent.ListenableFuture的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Executes a query to retrieve all the table rows
*
* @param query The Query instance to execute
*/
public ListenableFuture<MobileServiceList<E>> execute(Query query) {
final SettableFuture<MobileServiceList<E>> future = SettableFuture.create();
ListenableFuture<JsonElement> internalFuture = mInternalTable.execute(query);
Futures.addCallback(internalFuture, new FutureCallback<JsonElement>() {
@Override
public void onFailure(Throwable exc) {
future.setException(exc);
}
@Override
public void onSuccess(JsonElement result) {
processQueryResults(result, future);
}
}, MoreExecutors.directExecutor());
return future;
}
/**
* Unregisters the client for native notifications
*
* @param callback The operation callback
* @deprecated use {@link #unregister()} instead
*/
public void unregister(final UnregisterCallback callback) {
ListenableFuture<Void> deleteInstallationFuture = deleteInstallation();
Futures.addCallback(deleteInstallationFuture, new FutureCallback<Void>() {
@Override
public void onFailure(Throwable exception) {
if (exception instanceof Exception) {
callback.onUnregister((Exception) exception);
}
}
@Override
public void onSuccess(Void v) {
callback.onUnregister(null);
}
}, MoreExecutors.directExecutor());
}
@Override
public void recordResult(long startTime, ListenableFuture<Object> result)
{
invocations.incrementAndGet();
result.addListener(
() -> {
lastStartTime.set(startTime);
try {
result.get();
successes.incrementAndGet();
}
catch (Throwable throwable) {
failures.incrementAndGet();
}
},
directExecutor());
}
private static <T> ListenableFuture<T> batchRequest(
AbstractGoogleJsonClientRequest<T> requestToExecute,
RetryPolicy retryPolicy,
BatchRequestService batchService)
throws IOException {
AsyncRequest<T> request =
new AsyncRequest<>(requestToExecute, retryPolicy, GROUP_SERVICE_STATS);
try {
API_RATE_LIMITER.acquire();
batchService.add(request);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while batching request", e);
}
return request.getFuture();
}
private ListenableFuture<HealthCheckResponse> ping() {
PlatformMessage msg = PlatformMessage.buildRequest(MessageBody.ping(), AlexaUtil.ADDRESS_BRIDGE, STATUS_SERVICE)
.withCorrelationId(IrisUUID.randomUUID().toString())
.create();
ListenableFuture<PlatformMessage> future = platSvc.request(
msg,
(pm) -> Objects.equals(msg.getCorrelationId(), pm.getCorrelationId()), config.getHealthCheckTimeoutSecs()
);
return Futures.transform(future, (Function<PlatformMessage, HealthCheckResponse>) input -> {
HealthCheckResponse response = new HealthCheckResponse();
response.setHealthy(true);
response.setDescription("The system is currently healthy");
return response;
}, MoreExecutors.directExecutor());
}
public ListenableFuture<String>
sendTransaction(final List<byte[]> sigs, final Object twoFacData) {
// FIXME: The server should return the full limits including is_fiat from send_tx
return Futures.transform(mClient.sendTransaction(sigs, twoFacData),
new Function<String, String>() {
@Override
public String apply(final String txHash) {
try {
mLimitsData = mClient.getSpendingLimits();
} catch (final Exception e) {
// We don't know what the new limit is so nuke it
mLimitsData.mData.put("total", 0);
e.printStackTrace();
}
return txHash;
}
}, mExecutor);
}
/** Removes {@link IdentityGroup} from Cloud Identity Groups API using {@code service}. */
@Override
public ListenableFuture<Boolean> unmap(IdentityService service) throws IOException {
ListenableFuture<Operation> deleteGroup = service.deleteGroup(groupResourceName.get());
return Futures.transform(
deleteGroup,
new Function<Operation, Boolean>() {
@Override
@Nullable
public Boolean apply(@Nullable Operation input) {
try {
validateOperation(input);
return true;
} catch (IOException e) {
logger.log(Level.WARNING, String.format("Failed to delete Group %s", groupKey), e);
return false;
}
}
},
getExecutor());
}
@Override
public ListenableFuture<AlarmInfo> findAlarmInfoByIdAsync(AlarmId alarmId) {
log.trace("Executing findAlarmInfoByIdAsync [{}]", alarmId);
validateId(alarmId, "Incorrect alarmId " + alarmId);
return Futures.transform(alarmDao.findAlarmByIdAsync(alarmId.getId()),
(AsyncFunction<Alarm, AlarmInfo>) alarm1 -> {
AlarmInfo alarmInfo = new AlarmInfo(alarm1);
return Futures.transform(
entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
originatorName -> {
alarmInfo.setOriginatorName(originatorName);
return alarmInfo;
}
);
});
}
@Override
public ListenableFuture<?> processFor(Duration duration)
{
Driver driver;
synchronized (this) {
// if close() was called before we get here, there's not point in even creating the driver
if (closed) {
return Futures.immediateFuture(null);
}
if (this.driver == null) {
this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
}
driver = this.driver;
}
return driver.processFor(duration);
}
@Test
public void testGetChildren_NewChild()
{
TestCar car = _model.getObjectFactory().create(TestCar.class, Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "myCar"), null);
String engineName = "myEngine";
Map<String, Object> engineAttributes = new HashMap<>();
engineAttributes.put(ConfiguredObject.NAME, engineName);
TestEngine engine = (TestEngine) car.createChild(TestEngine.class, engineAttributes);
// Check we can observe the new child from the parent
assertEquals((long) 1, (long) car.getChildren(TestEngine.class).size());
assertEquals(engine, car.getChildById(TestEngine.class, engine.getId()));
assertEquals(engine, car.getChildByName(TestEngine.class, engine.getName()));
ListenableFuture attainedChild = car.getAttainedChildByName(TestEngine.class, engine.getName());
assertNotNull(attainedChild);
assertTrue("Engine should have already attained state", attainedChild.isDone());
}
/**
* Refreshes the value associated with {@code key}, unless another thread is already doing so.
* Returns the newly refreshed value associated with {@code key} if it was refreshed inline, or
* {@code null} if another thread is performing the refresh or if an error occurs during
* refresh.
*/
@Nullable
V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
final LoadingValueReference<K, V> loadingValueReference = insertLoadingValueReference(key, hash, checkTime);
if (loadingValueReference == null) {
return null;
}
ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
if (result.isDone()) {
try {
return Uninterruptibles.getUninterruptibly(result);
} catch (Throwable t) {
// don't let refresh exceptions propagate; error was already logged
}
}
return null;
}
ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(new Runnable() {
@Override
public void run() {
try {
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
}, directExecutor());
return loadingFuture;
}
@Override
public ListenableFuture<String> apply(final PreparedScript in) {
if (null != in.conduit) {
return Futures.<String>immediateFailedFuture(
new RuntimeException("Conduit script cannot be used"));
} else {
final ValueCallbackFuture<String> result = new ValueCallbackFuture<String>();
if (Looper.myLooper() == Looper.getMainLooper()) {
in.view.evaluateJavascript(in.script, result);
} else {
in.view.post(
new Runnable() {
@Override
public void run() {
in.view.evaluateJavascript(in.script, result);
}
});
}
return result;
}
}
public ListenableFuture<?> publishAsync(final Exchange exchange, final Message message, final @Nullable BasicProperties properties, final @Nullable Publish publish) {
// NOTE: Serialization must happen synchronously, because getter methods may not be thread-safe
final String payload = gson.toJson(message);
final AMQP.BasicProperties finalProperties = getProperties(message, properties);
final Publish finalPublish = Publish.forMessage(message, publish);
if(this.executorService == null) throw new IllegalStateException("Not connected");
return this.executorService.submit(new Runnable() {
@Override
public void run() {
try {
publish(exchange, payload, finalProperties, finalPublish);
} catch(Throwable e) {
logger.log(Level.SEVERE, "Unhandled exception publishing message type " + finalProperties.getType(), e);
}
}
});
}
@Override
public ListenableFuture<List<Device>> findDevicesByQuery(DeviceSearchQuery query) {
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
ListenableFuture<List<Device>> devices = Futures.transformAsync(relations, r -> {
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
List<ListenableFuture<Device>> futures = new ArrayList<>();
for (EntityRelation relation : r) {
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
if (entityId.getEntityType() == EntityType.DEVICE) {
futures.add(findDeviceByIdAsync(new DeviceId(entityId.getId())));
}
}
return Futures.successfulAsList(futures);
});
devices = Futures.transform(devices, new Function<List<Device>, List<Device>>() {
@Nullable
@Override
public List<Device> apply(@Nullable List<Device> deviceList) {
return deviceList == null ? Collections.emptyList() : deviceList.stream().filter(device -> query.getDeviceTypes().contains(device.getType())).collect(Collectors.toList());
}
});
return devices;
}
private static ListenableFuture<MutationResult> addData(final Datastore datastore) {
final Insert insert = QueryBuilder.insert("employee", 1234567L)
.value("fullname", "Fred Blinge")
.value("inserted", new Date())
.value("age", 40);
return datastore.executeAsync(insert);
}
@Nonnull
@Override
public ListenableFuture<Boolean> commitOperation(@Nonnull RaftStateContext ctx, @Nonnull byte[] operation) throws RaftException {
resetTimeout(ctx);
long index = log.append(operation);
SettableFuture<Boolean> f = SettableFuture.create();
requests.put(index, f);
return commit(ctx);
}
@Override
public void writeRecord(OUT record) throws IOException {
if (exception != null) {
throw new IOException("write record failed", exception);
}
ListenableFuture<Void> result = mapper.saveAsync(record);
Futures.addCallback(result, callback);
}
@Override
public ListenableFuture<FindMultiResponse<Session>> staff(ServerDoc.Network network, boolean disguised) {
return client().get(new QueryUri(collectionUri())
.put("network", network)
.put("staff", true)
.put("online", true)
.put("disguised", disguised)
.encode(),
meta.multiResponseType(),
HttpOption.INFINITE_RETRY);
}
private static Function<Object, ListenableFuture<MessageBody>> extractResponseWrapper(CommandDefinition command, Method m) {
boolean methodIsAsync = ListenableFuture.class.equals(m.getReturnType());
Type returnType = m.getGenericReturnType();
Map<String, AttributeDefinition> returnDefinitions = command.getReturnParameters();
if(returnDefinitions.size() > 0 && !returnDefinitions.containsKey("response") && !returnDefinitions.containsKey("attributes")) {
throw new IllegalArgumentException("The reflective command handler only supports commands with no return parameters, or 1 return parameter named 'response' or 'attribute'");
}
AttributeType responseType = VoidType.INSTANCE;
if(returnDefinitions.containsKey("response")) {
responseType = returnDefinitions.get("response").getAttributeType();
} else if(returnDefinitions.containsKey("attributes")) {
responseType = returnDefinitions.get("attributes").getAttributeType();
}
if(methodIsAsync) {
if(returnType instanceof ParameterizedType) {
returnType = ((ParameterizedType) returnType).getActualTypeArguments()[0];
}
else if(returnType instanceof Class) {
LOGGER.warn("Missing type information due to erasures, possible cast issues");
returnType = responseType.getJavaType();
}
else {
throw new IllegalArgumentException("Invalid return type [" + m.getGenericReturnType() + "] expected [" + responseType + "] or [ListenableFuture<" + responseType + ">]");
}
}
if(!responseType.isAssignableFrom(returnType)) {
throw new IllegalArgumentException("Invalid return type [" + m.getGenericReturnType() + "] expected [" + responseType + "] or [ListenableFuture<" + responseType + ">]");
}
com.google.common.base.Function<Object, MessageBody> translator = createTranslator(command);
if(methodIsAsync) {
return (o) -> ReflectiveCommandHandler.translateFromAsync(o, translator);
}
else {
return (o) -> ReflectiveCommandHandler.translateFromSync(o, translator);
}
}
@Test
public void testDestroyFreesReader()
{
PartitionedOutputBuffer buffer = createPartitionedBuffer(
createInitialEmptyOutputBuffers(PARTITIONED)
.withBuffer(FIRST, 0)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// attempt to get a page
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0, sizeOfPages(10));
// verify we are waiting for a page
assertFalse(future.isDone());
// add one page
addPage(buffer, createPage(0));
// verify we got one page
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));
// attempt to get another page, and verify we are blocked
future = buffer.get(FIRST, 1, sizeOfPages(10));
assertFalse(future.isDone());
// destroy the buffer
buffer.destroy();
assertQueueClosed(buffer, FIRST, 1);
// verify the future completed
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 1, false));
}
@Override
public ListenableFuture<?> isFull()
{
OutputBuffer outputBuffer;
synchronized (this) {
checkState(delegate != null, "Buffer has not been initialized");
outputBuffer = delegate;
}
return outputBuffer.isFull();
}
@SuppressWarnings("WeakerAccess")
void tryToProvidePreviewSurface() {
/*
Should only continue if:
- The preview size has been specified.
- The textureView's surfaceTexture is available (after TextureView
.SurfaceTextureListener#onSurfaceTextureAvailable is invoked)
- The surfaceCompleter has been set (after CallbackToFutureAdapter
.Resolver#attachCompleter is invoked).
*/
if (mResolution == null || mSurfaceTexture == null || mSurfaceRequest == null) {
return;
}
mSurfaceTexture.setDefaultBufferSize(mResolution.getWidth(), mResolution.getHeight());
final Surface surface = new Surface(mSurfaceTexture);
final ListenableFuture<Result> surfaceReleaseFuture =
CallbackToFutureAdapter.getFuture(completer -> {
mSurfaceRequest.provideSurface(surface,
CameraXExecutors.directExecutor(), completer::set);
return "provideSurface[request=" + mSurfaceRequest + " surface=" + surface
+ "]";
});
mSurfaceReleaseFuture = surfaceReleaseFuture;
mSurfaceReleaseFuture.addListener(() -> {
surface.release();
if (mSurfaceReleaseFuture == surfaceReleaseFuture) {
mSurfaceReleaseFuture = null;
}
}, ContextCompat.getMainExecutor(mTextureView.getContext()));
mSurfaceRequest = null;
correctPreviewForCenterCrop(mParent, mTextureView, mResolution);
}
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(final List<InputGroup<I>> inputGroups, final ExecutorCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
for (InputGroup<I> each : inputGroups) {
result.add(asyncExecute(each, callback));
}
return result;
}
QuorumCall<AsyncLogger, Void> doPreUpgrade() {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doPreUpgrade();
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
/**
* Asks the connected peer for the given transaction from its memory pool. Transactions in the chain cannot be
* retrieved this way because peers don't have a transaction ID to transaction-pos-on-disk index, and besides,
* in future many peers will delete old transaction data they don't need.
*/
@SuppressWarnings("unchecked")
// The 'unchecked conversion' warning being suppressed here comes from the sendSingleGetData() formally returning
// ListenableFuture instead of ListenableFuture<Transaction>. This is okay as sendSingleGetData() actually returns
// ListenableFuture<Transaction> in this context. Note that sendSingleGetData() is also used for Blocks.
public ListenableFuture<Transaction> getPeerMempoolTransaction(Sha256Hash hash) {
// This does not need to be locked.
// TODO: Unit test this method.
log.info("Request to fetch peer mempool tx {}", hash);
GetDataMessage getdata = new GetDataMessage(params);
getdata.addTransaction(hash);
return sendSingleGetData(getdata);
}
@Override
public synchronized ListenableFuture<String> await()
{
createFuture.set(null);
if (awaitFuture == null) {
awaitFuture = SettableFuture.create();
}
return awaitFuture;
}
private ListenableFuture<Void> linkDirectory(
Path execPath, Digest digest, Map<Digest, Directory> directoriesIndex) {
return transformAsync(
fileCache.putDirectory(digest, directoriesIndex, fetchService),
(cachePath) -> {
Files.createSymbolicLink(execPath, cachePath);
return immediateFuture(null);
},
fetchService);
}
/**
* Starts an asynchronous upload and returns a ListenableFuture for handling the result.
*/
synchronized ListenableFuture<String> upload() {
// Reset values from possible prior attempt
_attempts += 1;
_bytesTransferred = 0;
// Separate the future returned to the caller from the future generated by submitting the
// putObject request. If the writer is closed then uploadFuture may be canceled before it executes,
// in which case it may not trigger any callbacks. To ensure there is always a callback resultFuture is
// tracked independently and, in the event that the upload is aborted, gets set on abort().
_resultFuture = SettableFuture.create();
_uploadFuture = _uploadService.submit(new Runnable() {
@Override
public void run() {
try {
ProgressListener progressListener = new ProgressListener() {
@Override
public void progressChanged(ProgressEvent progressEvent) {
// getBytesTransferred() returns zero for all events not pertaining to the file transfer
_bytesTransferred += progressEvent.getBytesTransferred();
}
};
PutObjectRequest putObjectRequest = new PutObjectRequest(_bucket, _key, _file);
putObjectRequest.setGeneralProgressListener(progressListener);
PutObjectResult result = _amazonS3.putObject(putObjectRequest);
_resultFuture.set(result.getETag());
} catch (Throwable t) {
_resultFuture.setException(t);
}
}
});
return _resultFuture;
}
/**
* Populates the topic tree with all information from the ClientSessionPersistence
*/
private void populateTopicTree() {
final ListenableFuture<Set<String>> clientsFuture = clientSessionPersistence.getAllClients();
// Blocking. The TopicTreeStartup needs to be done before new connections are allowed.
try {
final Set<String> clients = clientsFuture.get();
for (final String client : clients) {
final Set<Topic> clientSubscriptions = clientSessionSubscriptionPersistence.getSubscriptions(client);
final ClientSession session = clientSessionPersistence.getSession(client, false);
if (session == null || session.getSessionExpiryInterval() == SESSION_EXPIRE_ON_DISCONNECT) {
// We don't have to remove the subscription from the topic tree, since it is not added to the topic tree yet.
clientSessionSubscriptionPersistence.removeAllLocally(client);
continue;
}
for (final Topic topic : clientSubscriptions) {
final SharedSubscription sharedSubscription = sharedSubscriptionService.checkForSharedSubscription(topic.getTopic());
if (sharedSubscription == null) {
topicTree.addTopic(client, topic, SubscriptionFlags.getDefaultFlags(false, topic.isRetainAsPublished(), topic.isNoLocal()), null);
} else {
topicTree.addTopic(client, new Topic(sharedSubscription.getTopicFilter(), topic.getQoS(), topic.isNoLocal(), topic.isRetainAsPublished()), SubscriptionFlags.getDefaultFlags(true, topic.isRetainAsPublished(), topic.isNoLocal()), sharedSubscription.getShareName());
}
}
}
} catch (final Exception ex) {
log.error("Failed to bootstrap topic tree.", ex);
}
}