下面列出了com.google.common.util.concurrent.SettableFuture#setFuture ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static <T> ListenableFuture<T> whenAllSucceed(
Callable<T> task,
ListeningScheduledExecutorService executor,
ListenableFuture<?>... futures) {
SettableFuture<T> taskFuture = SettableFuture.create();
// Create a countDown for all Futures to complete.
AtomicInteger countDown = new AtomicInteger(futures.length);
// Trigger the task when all futures are complete.
FutureCallback<Object> runWhenInputAreComplete =
new FutureCallback<Object>() {
@Override
public void onSuccess(@NullableDecl Object o) {
if (countDown.decrementAndGet() == 0) {
taskFuture.setFuture(executor.submit(task));
}
}
@Override
public void onFailure(Throwable throwable) {
if (!taskFuture.setException(throwable)) {
String msg = "Got more than one input failure. Logging failures after the first";
logger.log(Level.SEVERE, msg, throwable);
}
}
};
for (ListenableFuture<?> future : futures) {
Futures.addCallback(future, runWhenInputAreComplete, executor);
}
return taskFuture;
}
@Test
public void test2() throws InterruptedException, ExecutionException, TimeoutException {
SettableFuture<Object> future = SettableFuture.create();
boolean done = future.isDone();
logger.debug("future done:{}", future.isDone());
SettableFuture<Object> future2 = SettableFuture.create();
future2.setFuture(future);
logger.debug("future2 done:{}", future2.isDone());
boolean timeout = future2.setException(new RuntimeException("timeout"));
logger.debug("timeout:{}", timeout);
}
/**
* Helper for de-duping jobs against the cache.
*
* @param jobSupplier a supplier to use to create the actual job.
* @return future describing the job. It can either be an immediate future (result cache hit),
* ongoing job (job cache hit) or a new job (miss).
*/
protected final ListenableFuture<T> getJobWithCacheLookup(
Cell cell, K key, JobSupplier<T> jobSupplier, BuckEventBus eventBus) {
// We use a SettableFuture to resolve any races between threads that are trying to create the
// job for the given key. The SettableFuture is cheap to throw away in case we didn't "win"
// and can be easily "connected" to a future that actually does work in case we did.
SettableFuture<T> resultFuture = SettableFuture.create();
ListenableFuture<T> resultFutureInCache = jobsCache.putIfAbsent(key, resultFuture);
if (resultFutureInCache != null) {
// Another thread succeeded in putting the new value into the cache.
return resultFutureInCache;
}
// Ok, "our" candidate future went into the jobsCache, schedule the job and 'chain' the result
// to the SettableFuture, so that anyone else waiting on it will get the same result.
try {
Optional<T> cacheLookupResult = cache.lookupComputedNode(cell, key, eventBus);
if (cacheLookupResult.isPresent()) {
resultFuture.set(cacheLookupResult.get());
} else {
ListenableFuture<T> job = jobSupplier.get();
ListenableFuture<T> cacheJob =
Futures.transformAsync(
job,
input -> {
boolean targetNodeIsConfiguration = this.targetNodeIsConfiguration.test(input);
return Futures.immediateFuture(
cache.putComputedNodeIfNotPresent(
cell, key, input, targetNodeIsConfiguration, eventBus));
},
MoreExecutors.directExecutor());
resultFuture.setFuture(cacheJob);
}
} catch (Throwable t) {
resultFuture.setException(t);
throw t;
}
return resultFuture;
}
private ListenableFuture<Unit> enqueue(ImmutableList<UploadDataSupplier> dataSupplier) {
Builder<ListenableFuture<Unit>> futures = ImmutableList.builder();
for (UploadDataSupplier data : dataSupplier) {
Digest digest = data.getDigest();
SettableFuture<Unit> future = SettableFuture.create();
ListenableFuture<Unit> pendingFuture = pendingUploads.putIfAbsent(digest.getHash(), future);
if (pendingFuture == null) {
pendingFuture = future;
if (containsDigest(digest)) {
future.set(null);
} else {
SettableFuture<Unit> innerFuture = SettableFuture.create();
waitingMissingCheck.add(new PendingUpload(data, innerFuture));
future.setFuture(
Futures.transform(
innerFuture,
ignored -> {
addContainedHash(digest);
return null;
},
directExecutor()));
Futures.addCallback(
future,
MoreFutures.finallyCallback(() -> pendingUploads.remove(digest.getHash())),
directExecutor());
uploadService.submit(this::processUploads);
}
}
futures.add(pendingFuture);
}
return Futures.whenAllSucceed(futures.build()).call(() -> null, directExecutor());
}
private Optional<ListenableFuture<Unit>> discoverNewTargetsConcurrently(
BuildTarget buildTarget,
DependencyStack dependencyStack,
ConcurrentHashMap<BuildTarget, ListenableFuture<Unit>> jobsCache)
throws BuildFileParseException {
ListenableFuture<Unit> job = jobsCache.get(buildTarget);
if (job != null) {
return Optional.empty();
}
SettableFuture<Unit> newJob = SettableFuture.create();
if (jobsCache.putIfAbsent(buildTarget, newJob) != null) {
return Optional.empty();
}
ListenableFuture<Unit> future =
Futures.transformAsync(
parser.getTargetNodeJobAssertCompatible(parserState, buildTarget, dependencyStack),
targetNode -> {
targetsToNodes.put(buildTarget, targetNode);
checker.addTarget(buildTarget, dependencyStack);
List<ListenableFuture<Unit>> depsFuture = new ArrayList<>();
Set<BuildTarget> parseDeps = targetNode.getParseDeps();
for (BuildTarget parseDep : parseDeps) {
discoverNewTargetsConcurrently(parseDep, dependencyStack.child(parseDep), jobsCache)
.ifPresent(
depWork ->
depsFuture.add(
attachParentNodeToErrorMessage(buildTarget, parseDep, depWork)));
}
return Futures.transform(
Futures.allAsList(depsFuture),
Functions.constant(null),
MoreExecutors.directExecutor());
});
newJob.setFuture(future);
return Optional.of(newJob);
}
/**
* Retrieves and deserializes the NestedSet contents associated with the given fingerprint.
*
* <p>We wish to only do one deserialization per fingerprint. This is enforced by the {@link
* #nestedSetCache}, which is responsible for returning the actual contents or the canonical
* future that will contain the results of the deserialization. If that future is not owned by the
* current call of this method, it doesn't have to do anything further.
*
* <p>The return value is either an {@code Object[]} or a {@code ListenableFuture<Object[]>}.
*/
// All callers will test on type and check return value if it's a future.
@SuppressWarnings("FutureReturnValueIgnored")
Object getContentsAndDeserialize(
ByteString fingerprint, DeserializationContext deserializationContext) throws IOException {
SettableFuture<Object[]> future = SettableFuture.create();
Object contents = nestedSetCache.putIfAbsent(fingerprint, future);
if (contents != null) {
return contents;
}
ListenableFuture<byte[]> retrieved = nestedSetStorageEndpoint.get(fingerprint);
Stopwatch fetchStopwatch = Stopwatch.createStarted();
future.setFuture(
Futures.transformAsync(
retrieved,
bytes -> {
Duration fetchDuration = fetchStopwatch.elapsed();
if (FETCH_FROM_STORAGE_LOGGING_THRESHOLD.compareTo(fetchDuration) < 0) {
logger.atInfo().log(
"NestedSet fetch took: %dms, size: %dB",
fetchDuration.toMillis(), bytes.length);
}
CodedInputStream codedIn = CodedInputStream.newInstance(bytes);
int numberOfElements = codedIn.readInt32();
DeserializationContext newDeserializationContext =
deserializationContext.getNewMemoizingContext();
// The elements of this list are futures for the deserialized values of these
// NestedSet contents. For direct members, the futures complete immediately and yield
// an Object. For transitive members (fingerprints), the futures complete with the
// underlying fetch, and yield Object[]s.
List<ListenableFuture<?>> deserializationFutures = new ArrayList<>();
for (int i = 0; i < numberOfElements; i++) {
Object deserializedElement = newDeserializationContext.deserialize(codedIn);
if (deserializedElement instanceof ByteString) {
deserializationFutures.add(
maybeWrapInFuture(
getContentsAndDeserialize(
(ByteString) deserializedElement, deserializationContext)));
} else {
deserializationFutures.add(Futures.immediateFuture(deserializedElement));
}
}
return Futures.whenAllComplete(deserializationFutures)
.call(
() -> {
Object[] deserializedContents = new Object[deserializationFutures.size()];
for (int i = 0; i < deserializationFutures.size(); i++) {
deserializedContents[i] = Futures.getDone(deserializationFutures.get(i));
}
return deserializedContents;
},
executor);
},
executor));
return future;
}
/** Get or load all raw target nodes from a build file */
public ListenableFuture<ImmutableList<UnconfiguredTargetNode>> getAllNodesJob(
Cell cell, AbsPath buildFile) {
SettableFuture<ImmutableList<UnconfiguredTargetNode>> future = SettableFuture.create();
ListenableFuture<ImmutableList<UnconfiguredTargetNode>> cachedFuture =
allNodeCache.putIfAbsent(buildFile, future);
if (cachedFuture != null) {
return cachedFuture;
}
try {
ListenableFuture<List<UnconfiguredTargetNode>> allNodesListJob =
Futures.transformAsync(
MoreFutures.combinedFutures(
packagePipeline.getPackageJob(cell, buildFile),
buildFileRawNodeParsePipeline.getFileJob(cell, buildFile),
executorService),
resultingPair -> {
ImmutableList<Map<String, Object>> allToConvert =
ImmutableList.copyOf(resultingPair.getSecond().getTargets().values());
if (shuttingDown()) {
return Futures.immediateCancelledFuture();
}
ImmutableList.Builder<ListenableFuture<UnconfiguredTargetNode>> allNodeJobs =
ImmutableList.builderWithExpectedSize(allToConvert.size());
for (Map<String, Object> from : allToConvert) {
UnconfiguredBuildTarget target =
UnconfiguredBuildTarget.of(
UnflavoredBuildTargetFactory.createFromRawNode(
cell.getRoot().getPath(),
cell.getCanonicalName(),
from,
buildFile.getPath()),
FlavorSet.NO_FLAVORS);
allNodeJobs.add(
cache.getJobWithCacheLookup(
cell,
target,
() ->
dispatchComputeNode(
cell,
target,
DependencyStack.top(target),
from,
resultingPair.getFirst()),
eventBus));
}
return Futures.allAsList(allNodeJobs.build());
},
executorService);
future.setFuture(Futures.transform(allNodesListJob, ImmutableList::copyOf, executorService));
} catch (Throwable t) {
future.setException(t);
}
return future;
}
/**
* Get or load all target nodes from a build file configuring it with global platform
* configuration or {@code default_target_platform} rule arg
*/
ListenableFuture<ImmutableList<TargetNodeMaybeIncompatible>> getAllRequestedTargetNodesJob(
Cell cell, AbsPath buildFile, Optional<TargetConfiguration> globalTargetConfiguration) {
SettableFuture<ImmutableList<TargetNodeMaybeIncompatible>> future = SettableFuture.create();
Pair<AbsPath, Optional<TargetConfiguration>> pathCacheKey =
new Pair<>(buildFile, globalTargetConfiguration);
ListenableFuture<ImmutableList<TargetNodeMaybeIncompatible>> cachedFuture =
allNodeCache.putIfAbsent(pathCacheKey, future);
if (cachedFuture != null) {
return cachedFuture;
}
try {
ListenableFuture<List<TargetNodeMaybeIncompatible>> allNodesListJob =
Futures.transformAsync(
unconfiguredTargetNodePipeline.getAllNodesJob(cell, buildFile),
allToConvert -> {
if (shuttingDown()) {
return Futures.immediateCancelledFuture();
}
ImmutableList.Builder<ListenableFuture<TargetNodeMaybeIncompatible>> allNodeJobs =
ImmutableList.builderWithExpectedSize(allToConvert.size());
for (UnconfiguredTargetNode from : allToConvert) {
ListenableFuture<TargetNodeMaybeIncompatible> targetNode =
configureRequestedTarget(
cell, from.getBuildTarget(), globalTargetConfiguration, from);
allNodeJobs.add(targetNode);
}
return Futures.allAsList(allNodeJobs.build());
},
executorService);
future.setFuture(Futures.transform(allNodesListJob, ImmutableList::copyOf, executorService));
} catch (Throwable t) {
future.setException(t);
}
return future;
}
private ListenableFuture<BuildResult> getBuildRuleResultWithRuntimeDeps(
BuildRule rule, BuildEngineBuildContext buildContext, ExecutionContext executionContext) {
// If the rule is already executing, return its result future from the cache.
ListenableFuture<BuildResult> existingResult = results.get(rule.getBuildTarget());
if (existingResult != null) {
return existingResult;
}
// Create a `SettableFuture` and atomically put it in the results map. At this point, if it's
// not already created, we should proceed.
SettableFuture<BuildResult> future = SettableFuture.create();
existingResult = results.putIfAbsent(rule.getBuildTarget(), future);
if (existingResult != null) {
return existingResult;
}
// Get the future holding the result for this rule and, if we have no additional runtime deps
// to attach, return it.
ListenableFuture<RuleKey> ruleKey = calculateRuleKey(rule, buildContext);
ListenableFuture<BuildResult> result =
Futures.transformAsync(
ruleKey,
input -> processBuildRule(rule, buildContext, executionContext),
serviceByAdjustingDefaultWeightsTo(SCHEDULING_MORE_WORK_RESOURCE_AMOUNTS));
if (!(rule instanceof HasRuntimeDeps)) {
future.setFuture(result);
return future;
}
// Collect any runtime deps we have into a list of futures.
List<ListenableFuture<BuildResult>> runtimeDepResults =
((HasRuntimeDeps) rule)
.getRuntimeDeps(resolver)
.map(resolver::getRule)
.map(dep -> getBuildRuleResultWithRuntimeDeps(dep, buildContext, executionContext))
.collect(ImmutableList.toImmutableList());
// If we don't have any runtime deps we can short circuit here
if (runtimeDepResults.isEmpty()) {
future.setFuture(result);
return future;
}
// Create a new combined future, which runs the original rule and all the runtime deps in
// parallel, but which propagates an error if any one of them fails.
// It also checks that all runtime deps succeeded.
ListenableFuture<BuildResult> chainedResult =
Futures.transformAsync(
Futures.allAsList(runtimeDepResults),
results -> {
Optional<BuildResult> cancelledResult = Optional.empty();
for (BuildResult buildResult : results) {
if (!buildResult.isSuccess()) {
if (buildResult.getStatus() == BuildRuleStatus.CANCELED) {
cancelledResult =
Optional.of(BuildResult.canceled(rule, buildResult.getFailure()));
} else {
return Futures.immediateFuture(
BuildResult.failure(rule, buildResult.getFailure()));
}
}
}
return cancelledResult.isPresent()
? Futures.immediateFuture(cancelledResult.get())
: result;
},
MoreExecutors.directExecutor());
future.setFuture(chainedResult);
return future;
}
/**
* @return a {@link ListenableFuture} wrapping the result of calculating the {@link RuleKey} of
* the given {@link BuildRule}.
*/
public ListenableFuture<T> calculate(BuckEventBus buckEventBus, BuildRule rule) {
// Do an initial check for an existing future, to avoid allocating a `SettableFuture` (below).
ListenableFuture<T> existingFuture = ruleKeys.get(rule.getBuildTarget());
if (existingFuture != null) {
return existingFuture;
}
// Create a `SettableFuture` and atomically put it in the rule key map. At this point, if it's
// not already created, we should proceed.
SettableFuture<T> future = SettableFuture.create();
existingFuture = ruleKeys.putIfAbsent(rule.getBuildTarget(), future);
if (existingFuture != null) {
return existingFuture;
}
T fromInternalCache = ruleKeyFactory.getFromCache(rule);
if (fromInternalCache != null) {
// Record the rule key future.
future.set(fromInternalCache);
// Because a rule key will be invalidated from the internal cache any time one of its
// dependents is invalidated, we know that all of our transitive deps are also in cache.
return future;
}
// Grab all the dependency rule key futures. Since our rule key calculation depends on this
// one, we need to wait for them to complete.
ListenableFuture<List<T>> depKeys =
Futures.transformAsync(
Futures.immediateFuture(ruleDepsCache.get(rule)),
(@Nonnull SortedSet<BuildRule> deps) -> {
List<ListenableFuture<T>> depKeys1 =
new ArrayList<>(SortedSets.sizeEstimate(rule.getBuildDeps()));
for (BuildRule dep : deps) {
depKeys1.add(calculate(buckEventBus, dep));
}
return Futures.allAsList(depKeys1);
},
service);
// Setup a future to calculate this rule key once the dependencies have been calculated.
ListenableFuture<T> calculated =
Futures.transform(
depKeys,
(List<T> input) -> {
try (Scope scope = ruleKeyCalculationScope.apply(buckEventBus, rule)) {
return ruleKeyFactory.build(rule);
} catch (Exception e) {
throw new BuckUncheckedExecutionException(
e, String.format("When computing rulekey for %s.", rule));
}
},
service);
future.setFuture(calculated);
return future;
}