下面列出了com.google.common.util.concurrent.SettableFuture#isDone ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static void updateMemoryFuture(ListenableFuture<?> memoryPoolFuture, AtomicReference<SettableFuture<?>> targetFutureReference)
{
if (!memoryPoolFuture.isDone()) {
SettableFuture<?> currentMemoryFuture = targetFutureReference.get();
while (currentMemoryFuture.isDone()) {
SettableFuture<?> settableFuture = SettableFuture.create();
// We can't replace one that's not done, because the task may be blocked on that future
if (targetFutureReference.compareAndSet(currentMemoryFuture, settableFuture)) {
currentMemoryFuture = settableFuture;
}
else {
currentMemoryFuture = targetFutureReference.get();
}
}
SettableFuture<?> finalMemoryFuture = currentMemoryFuture;
// Create a new future, so that this operator can un-block before the pool does, if it's moved to a new pool
memoryPoolFuture.addListener(() -> finalMemoryFuture.set(null), directExecutor());
}
}
public ListenableFuture<?> process()
{
checkLockNotHeld("Cannot process while holding the driver lock");
// if the driver is blocked we don't need to continue
SettableFuture<?> blockedFuture = driverBlockedFuture.get();
if (!blockedFuture.isDone()) {
return blockedFuture;
}
Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
ListenableFuture<?> future = processInternal(createTimer());
return updateDriverBlockedFuture(future);
});
return result.orElse(NOT_BLOCKED);
}
public void schedule(ScheduledCommand command) {
logger.trace("Scheduling command [{}]", command);
final SettableFuture<ScheduledTask> ref = SettableFuture.create();
pendingRequests.put(command.getSchedulerAddress(), ref);
try {
ScheduledTask task = scheduler.scheduleAt(() -> dispatch(command, ref), command.getScheduledTime());
ref.set(task);
metrics.onCommandScheduled();
}
finally {
// should never happen, but...
// if anything goes wrong, clear it out
if(!ref.isDone()) {
pendingRequests.remove(command.getSchedulerAddress(), ref);
ref.cancel(true);
}
}
}
private void dispatch(PlatformMessage message) {
String correlationId = message.getCorrelationId();
if(StringUtils.isBlank(correlationId)) {
return;
}
SettableFuture<PlatformMessage> future = requests.remove(correlationId);
if(future == null || future.isDone()) {
return;
}
if(message.isError()) {
MessageBody body = message.getValue();
future.setException(new ErrorEventException((String) body.getAttributes().get("code"), (String) body.getAttributes().get("message")));
} else {
future.set(message);
}
}
@Override
public void processResponseStream(StreamContext<ProcessEvent> context) {
Consumer<ProcessEvent> watcher = context.getWatcher();
InputStream response = context.getStream();
SettableFuture<Boolean> interrupter = context.getInterrupter();
interrupter.addListener(() -> Thread.currentThread().interrupt(), MoreExecutors.directExecutor());
try (FrameReader frameReader = new FrameReader(response)) {
Frame frame = frameReader.readFrame();
while (frame != null && !interrupter.isDone()) {
try {
ProcessEvent.watchRaw(watcher, frame.getMessage(), false);
} catch (Exception e) {
log.error("Cannot read body", e);
} finally {
frame = frameReader.readFrame();
}
}
} catch (Exception t) {
log.error("Cannot close reader", t);
}
}
public ListenableFuture<?> processFor(Duration duration)
{
checkLockNotHeld("Cannot process for a duration while holding the driver lock");
requireNonNull(duration, "duration is null");
// if the driver is blocked we don't need to continue
SettableFuture<?> blockedFuture = driverBlockedFuture.get();
if (!blockedFuture.isDone()) {
return blockedFuture;
}
long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
OperationTimer operationTimer = createTimer();
driverContext.startProcessTimer();
driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
try {
long start = System.nanoTime();
do {
ListenableFuture<?> future = processInternal(operationTimer);
if (!future.isDone()) {
return updateDriverBlockedFuture(future);
}
}
while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
}
finally {
driverContext.getYieldSignal().reset();
driverContext.recordProcessed(operationTimer);
}
return NOT_BLOCKED;
});
return result.orElse(NOT_BLOCKED);
}
private int getNumPendingRecordFutures() {
int numPending = 0;
for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
if (!future.isDone()) {
numPending++;
}
}
return numPending;
}
private int getNumPendingRecordFutures() {
int numPending = 0;
for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
if (!future.isDone()) {
numPending++;
}
}
return numPending;
}
@Override
protected void innerClose(@Nullable Throwable t) {
if (t != null) {
for (SettableFuture<TaskResult> resultFuture : resultFutures) {
if (!resultFuture.isDone()) {
resultFuture.setException(t);
}
}
}
}
private int getNumPendingRecordFutures() {
int numPending = 0;
for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
if (!future.isDone()) {
numPending++;
}
}
return numPending;
}
@Test
public void test() throws InterruptedException, ExecutionException, TimeoutException {
SettableFuture<Object> future = SettableFuture.create();
boolean done = future.isDone();
logger.debug("done:{}", done);
}
@Test
public void test2() throws InterruptedException, ExecutionException, TimeoutException {
SettableFuture<Object> future = SettableFuture.create();
boolean done = future.isDone();
logger.debug("future done:{}", future.isDone());
SettableFuture<Object> future2 = SettableFuture.create();
future2.setFuture(future);
logger.debug("future2 done:{}", future2.isDone());
boolean timeout = future2.setException(new RuntimeException("timeout"));
logger.debug("timeout:{}", timeout);
}
private String getActualPath() throws ExecutionException, InterruptedException {
String path = _sync._nodePath;
if (path != null) {
return path;
}
SettableFuture<String> future = SettableFuture.create();
while (!future.isDone()) {
waitThenGetActualPath(future);
}
return future.get();
}
@Override
protected ListenableFuture<Tree> getTreeFuture(
String reason, Digest inputRoot, ExecutorService service, RequestMetadata requestMetadata) {
SettableFuture<Void> future = SettableFuture.create();
Tree.Builder tree = Tree.newBuilder().setRootDigest(inputRoot);
Set<Digest> digests = Sets.newConcurrentHashSet();
Queue<Digest> remaining = new ConcurrentLinkedQueue();
remaining.offer(inputRoot);
Context ctx = Context.current();
TreeCallback callback =
new TreeCallback(future) {
@Override
protected void onDirectory(Digest digest, Directory directory) {
tree.putDirectories(digest.getHash(), directory);
for (DirectoryNode childNode : directory.getDirectoriesList()) {
Digest child = childNode.getDigest();
if (digests.add(child)) {
remaining.offer(child);
}
}
}
@Override
boolean next() {
Digest nextDigest = remaining.poll();
if (!future.isDone() && nextDigest != null) {
ctx.run(
() ->
addCallback(
transform(
expectDirectory(reason, nextDigest, service, requestMetadata),
directory -> new DirectoryEntry(nextDigest, directory),
service),
this,
service));
return true;
}
return false;
}
};
callback.next();
return transform(future, (result) -> tree.build(), service);
}