下面列出了com.google.common.util.concurrent.ListenableFuture#addListener ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static void updateMemoryFuture(ListenableFuture<?> memoryPoolFuture, AtomicReference<SettableFuture<?>> targetFutureReference)
{
if (!memoryPoolFuture.isDone()) {
SettableFuture<?> currentMemoryFuture = targetFutureReference.get();
while (currentMemoryFuture.isDone()) {
SettableFuture<?> settableFuture = SettableFuture.create();
// We can't replace one that's not done, because the task may be blocked on that future
if (targetFutureReference.compareAndSet(currentMemoryFuture, settableFuture)) {
currentMemoryFuture = settableFuture;
}
else {
currentMemoryFuture = targetFutureReference.get();
}
}
SettableFuture<?> finalMemoryFuture = currentMemoryFuture;
// Create a new future, so that this operator can un-block before the pool does, if it's moved to a new pool
memoryPoolFuture.addListener(() -> finalMemoryFuture.set(null), directExecutor());
}
}
/**
* 定期的是去获取最新的server list
*/
public void refresh() {
try {
final ListenableFuture<String[][]> future = listAllServerUrls();
urlsFuture = future;
future.addListener(new Runnable() {
@Override
public void run() {
updateUrl(future, CHECK_UPDATE_URL, false);
updateUrl(future, LONG_POLLING_CHECK_UPDATE_URL, false);
updateUrl(future, LOAD_DATA_URL, false);
updateUrl(future, FORCE_RELOAD_URL, false);
updateUrl(future, LOAD_DATA_URL, true);
updateUrl(future, FORCE_RELOAD_URL, true);
updateUrl(future, RECORD_LOADING_URL, false);
updateUrl(future, GET_GROUP_FILES, false);
}
}, Constants.CURRENT_EXECUTOR);
} catch (Exception e) {
logger.debug("refresh failed", e);
}
}
private ListenableFuture<?> updateDriverBlockedFuture(ListenableFuture<?> sourceBlockedFuture)
{
// driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
// or any of the operators gets a memory revocation request
SettableFuture<?> newDriverBlockedFuture = SettableFuture.create();
driverBlockedFuture.set(newDriverBlockedFuture);
sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
// it's possible that memory revoking is requested for some operator
// before we update driverBlockedFuture above and we don't want to miss that
// notification, so we check to see whether that's the case before returning.
boolean memoryRevokingRequested = activeOperators.stream()
.filter(operator -> !revokingOperators.containsKey(operator))
.map(Operator::getOperatorContext)
.anyMatch(OperatorContext::isMemoryRevokingRequested);
if (memoryRevokingRequested) {
newDriverBlockedFuture.set(null);
}
return newDriverBlockedFuture;
}
/**
* convert ListenableFuture of Type S to CompletableFuture of Type T.
*/
static <S, T> CompletableFuture<T> toCompletableFuture(ListenableFuture<S> sourceFuture, Function<S, T> resultConvert,
Executor executor) {
CompletableFuture<T> targetFuture = new CompletableFuture<T>() {
// the cancel of targetFuture also cancels the sourceFuture.
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
return sourceFuture.cancel(mayInterruptIfRunning);
}
};
sourceFuture.addListener(() -> {
try {
targetFuture.complete(resultConvert.apply(sourceFuture.get()));
} catch (Exception e) {
targetFuture.completeExceptionally(toEtcdException(e));
}
}, executor);
return targetFuture;
}
ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(new Runnable() {
@Override
public void run() {
try {
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
}, directExecutor());
return loadingFuture;
}
@Override
public ListenableFuture<IKey> resolveKeyAsync(String kid) {
KeyIdentifier keyIdentifier = new KeyIdentifier(kid);
if (keyIdentifier.version() == null) {
final ListenableFuture<IKey> key = keyResolver.resolveKeyAsync(kid);
key.addListener(new Runnable() {
@Override
public void run() {
try {
cache.put(key.get().getKid(), key);
} catch (Exception e) {
// Key caching will occur on first read
}
}
},
MoreExecutors.directExecutor()
);
return key;
} else {
return cache.getUnchecked(kid);
}
}
@VisibleForTesting
ListenableFuture<Void> uploadBlobAsync(HashCode hash, Chunker chunker) {
synchronized (lock) {
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
ListenableFuture<Void> uploadResult = uploadsInProgress.get(hash);
if (uploadResult == null) {
uploadResult = startAsyncUpload(hash, chunker);
uploadsInProgress.put(hash, uploadResult);
uploadResult.addListener(
() -> {
synchronized (lock) {
uploadsInProgress.remove(hash);
}
},
MoreExecutors.directExecutor());
}
return uploadResult;
}
}
@Override
public void recordResult(long startTime, ListenableFuture<Object> result)
{
invocations.incrementAndGet();
result.addListener(
() -> {
lastStartTime.set(startTime);
try {
result.get();
successes.incrementAndGet();
}
catch (Throwable throwable) {
failures.incrementAndGet();
}
},
directExecutor());
}
public void initCamera() {
// 获取 ProcessCameraProvider
ListenableFuture<ProcessCameraProvider> cameraProviderFuture = ProcessCameraProvider.getInstance(getContext());
cameraProviderFuture.addListener(() -> {
// 初始化 UseCase
initUseCase();
try {
ProcessCameraProvider cameraProvider = cameraProviderFuture.get();
cameraProvider.unbindAll();
// 绑定 UseCase 到相机
camera = cameraProvider.bindToLifecycle((LifecycleOwner) getContext(), CameraSelector.DEFAULT_BACK_CAMERA, preview, imageAnalyzer);
// 开始预览
preview.setSurfaceProvider(mPreviewView.createSurfaceProvider());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}, ContextCompat.getMainExecutor(getContext()));
}
ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(new Runnable() {
@Override
public void run() {
try {
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
}, directExecutor());
return loadingFuture;
}
@Override
public ListenableFuture<?> work() {
int publishingBatchNum = statusInfo.getFinishedBatchNum() + 1;
final List<Host> machines = statusInfo.getBatches().get(publishingBatchNum);
preparePush(machines);
ListenableFuture<?> f1 = executor.submit(new Runnable() {
@Override
public void run() {
push(machines);
}
});
ListenableScheduledFuture<?> f2 = executor.schedule(new Runnable() {
@Override
public void run() {
push(machines);
}
}, DELAY_TIME_MS, TimeUnit.MILLISECONDS);
ListenableFuture<List<Object>> workFuture = Futures.successfulAsList(f1, f2);
workFuture.addListener(new Runnable() {
@Override
public void run() {
accept(Command.next);
}
}, Constants.CURRENT_EXECUTOR);
return workFuture;
}
/**
* Turns a google ListenableFuture into a Java 7 CompletableFuture
*/
public static <T> CompletableFuture<T> toJava(ListenableFuture<T> l) {
CompletableFuture<T> f = new CompletableFuture<>();
l.addListener(() -> {
try {
f.complete(l.get());
} catch (Throwable x) {
f.completeExceptionally(x);
}
}, Runnable::run);
return f;
}
private boolean await(ListenableFuture future) {
if (future == null) {
return false;
}
future.addListener(this, executor);
return true;
}
private synchronized void sendNextRequest()
{
TaskStatus taskStatus = getTaskInfo().getTaskStatus();
if (!running) {
return;
}
// we already have the final task info
if (isDone(getTaskInfo())) {
stop();
return;
}
// if we have an outstanding request
if (future != null && !future.isDone()) {
return;
}
// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<?> errorRateLimit = errorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendNextRequest, executor);
return;
}
HttpUriBuilder httpUriBuilder = uriBuilderFrom(taskStatus.getSelf());
URI uri = summarizeTaskInfo ? httpUriBuilder.addParameter("summarize").build() : httpUriBuilder.build();
Request request = prepareGet()
.setUri(uri)
.setHeader(CONTENT_TYPE, JSON_UTF_8.toString())
.build();
errorTracker.startRequest();
future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
currentRequestStartNanos.set(System.nanoTime());
Futures.addCallback(future, new SimpleHttpResponseHandler<>(this, request.getUri(), stats), executor);
}
private void postCreateDefaultExchangeTasks()
{
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
_messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
}
else
{
_messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
}
// propagate any exception thrown during recovery into HouseKeepingTaskExecutor to handle them accordingly
// TODO if message recovery fails we ought to be transitioning the VH into ERROR and releasing the thread-pools etc.
final ListenableFuture<Void> recoveryResult = _messageStoreRecoverer.recover(this);
recoveryResult.addListener(new Runnable()
{
@Override
public void run()
{
Futures.getUnchecked(recoveryResult);
}
}, _houseKeepingTaskExecutor);
State finalState = State.ERRORED;
try
{
initialiseHouseKeeping();
initialiseFlowToDiskChecking();
finalState = State.ACTIVE;
_acceptsConnections.set(true);
}
finally
{
setState(finalState);
reportIfError(getState());
}
}
/**
* Deletes the given node if the given future failed.
*/
private void deleteNodeOnFailure(final ListenableFuture<?> future, final String node) {
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
zkClient.delete(node);
}
}
}, Threads.SAME_THREAD_EXECUTOR);
}
@Override
public void process(RemotingHeader header, final Object command, final ResponseHandler handler) {
try {
final int code = header.getCode();
final String id = header.getId();
final TaskFactory<?> factory = taskFactories.get(code);
Preconditions.checkState(factory != null);
logger.info("receive {} command, id [{}], command [{}]", factory.name(), id, command);
RunnableTask task = createTask(factory, header, command, handler);
if (task == null) {
return;
}
ListenableFuture<Integer> future = task.execute();
future.addListener(new Runnable() {
@Override
public void run() {
taskStore.finish(id);
}
}, MoreExecutors.directExecutor());
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
int eof = result == null ? 0 : result;
handler.handleEOF(eof);
logger.info("{} command finish, id [{}], command [{}]", factory.name(), id, command);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof CancellationException) {
logger.info("{} command canceled, id [{}]", factory.name(), id);
return;
}
handler.handleError(t);
logger.error("{} command error, id [{}], command [{}]", factory.name(), id, command, t);
}
}, AgentRemotingExecutor.getExecutor());
} catch (Exception e) {
handler.handleError(e);
logger.error("task process error", e);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RequestData)) {
super.channelRead(ctx, msg);
return;
}
@SuppressWarnings("unchecked")
RequestData<String> inputData = (RequestData<String>) msg;
UiConnection uiConnection = uiConnectionStore.register(ctx.channel());
if (inputData.getType() == CommandCode.REQ_TYPE_CANCEL.getCode()) {
cancelRequest(uiConnection);
return;
}
Optional<CommunicateCommand> command = commandStore.getCommunicateCommand(inputData.getType());
if (!command.isPresent()) {
ctx.channel().writeAndFlush(UiResponses.createNoCommandResponse(inputData));
return;
}
CommunicateCommand communicateCommand = command.get();
if (!communicateCommand.isSupportMulti() && inputData.getAgentServerInfos().size() > 1) {
ctx.channel().writeAndFlush(UiResponses.createNotSupportMultiResponse(inputData));
return;
}
CommunicateCommandProcessor<?> processor = communicateCommand.getProcessor();
Optional<? extends RequestData<?>> requestDataOptional = preProcessor(processor, inputData, ctx);
if (!requestDataOptional.isPresent()) {
return;
}
RequestData<?> requestData = requestDataOptional.get();
List<AgentConnection> agentConnections = Lists.newArrayListWithCapacity(requestData.getAgentServerInfos().size());
List<String> lessVersionAgents = Lists.newArrayList();
List<String> noConnectionAgents = Lists.newArrayList();
for (AgentServerInfo agentServerInfo : requestData.getAgentServerInfos()) {
Optional<AgentConnection> agentConnection = agentConnectionStore.getConnection(agentServerInfo.getAgentId());
if (agentConnection.isPresent()) {
if (agentConnection.get().getVersion() >= communicateCommand.getMinAgentVersion()) {
agentConnections.add(agentConnection.get());
} else {
lessVersionAgents.add(agentServerInfo.getAgentId());
}
} else {
noConnectionAgents.add(agentServerInfo.getAgentId());
}
}
noConnectionAgents.stream()
.map(noConnectionAgent -> UiResponses.createNoConnectionResponse(noConnectionAgent, requestData))
.forEach(uiConnection::write);
lessVersionAgents.stream().
map(lessVersionAgent -> UiResponses.createLessVersionResponse(lessVersionAgent, requestData))
.forEach(uiConnection::write);
if (agentConnections.isEmpty()) {
uiConnection.write(UiResponses.createFinishResponse(requestData));
return;
}
List<Session> sessions = agentConnections.stream()
.map((agentConnection -> sendMessage(command.get(), requestData, processor, agentConnection, uiConnection)))
.collect(Collectors.toList());
ListenableFuture<List<Session.State>> sessionsFuture = Futures.successfulAsList(sessions.stream().map(Session::getEndState).collect(Collectors.toList()));
sessionsFuture.addListener(() -> uiConnection.write(UiResponses.createFinishResponse(requestData)), MoreExecutors.directExecutor());
}
/**
* Runs the given task on the specified executor (defaulting to BlazeExecutor's executor) with a
* progress dialog.
*/
public <T> ListenableFuture<T> submitTaskWithResult(ProgressiveWithResult<T> progressive) {
// The progress indicator must be created on the UI thread.
final ProgressWindow indicator =
UIUtil.invokeAndWaitIfNeeded(
() -> {
if (modality == Modality.MODAL) {
ProgressWindow window = new ProgressWindow(cancelable, project);
window.setTitle(title);
return window;
} else {
PerformInBackgroundOption backgroundOption =
modality == Modality.BACKGROUNDABLE
? PerformInBackgroundOption.DEAF
: PerformInBackgroundOption.ALWAYS_BACKGROUND;
return new BackgroundableProcessIndicator(
project, title, backgroundOption, "Cancel", "Cancel", cancelable);
}
});
indicator.setIndeterminate(true);
indicator.start();
final ListenableFuture<T> future =
executor.submit(
() ->
ProgressManager.getInstance()
.runProcess(() -> progressive.compute(indicator), indicator));
if (cancelable) {
indicator.addStateDelegate(
new AbstractProgressIndicatorExBase() {
@Override
public void cancel() {
super.cancel();
future.cancel(true);
}
});
}
future.addListener(
() -> {
if (indicator.isRunning()) {
indicator.stop();
indicator.processFinish();
}
},
MoreExecutors.directExecutor());
return future;
}
@Override
public void perform(UiController controller, View view) {
WebView webView = (WebView) view;
if (Build.VERSION.SDK_INT >= 23 && !webView.isHardwareAccelerated()) {
throw new PerformException.Builder()
.withViewDescription(webView.toString())
.withCause(
new RuntimeException("Hardware acceleration is not supported on current device"))
.build();
}
List<Object> arguments = checkNotNull(atom.getArguments(element));
String script = checkNotNull(atom.getScript());
final ListenableFuture<Evaluation> localEval =
JavascriptEvaluation.evaluate(webView, script, arguments, window);
if (null != window && Build.VERSION.SDK_INT == 19) {
Log.w(
TAG,
"WARNING: KitKat does not report when an iframe is loading new content. "
+ "If you are interacting with content within an iframe and that content is changing "
+ "(eg: you have just pressed a submit button). Espresso will not be able to block "
+ "you until the new content has loaded (which it can do on all other API levels). "
+ "You will need to have some custom polling / synchronization with the iframe in "
+ "that case.");
}
localEval.addListener(
new Runnable() {
@Override
public void run() {
try {
atomActionResultPropagator.setResult(localEval.get());
} catch (ExecutionException ee) {
reportException(ee.getCause());
} catch (InterruptedException ie) {
reportException(ie);
} catch (RemoteException re) {
reportException(re);
}
}
},
MoreExecutors.directExecutor());
}