下面列出了com.google.common.util.concurrent.ListenableFuture#cancel ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testMemoryFutureCancellation()
{
setUpCountStarFromOrdersWithJoin();
ListenableFuture<?> future = userPool.reserve(fakeQueryId, "test", TEN_MEGABYTES.toBytes());
assertTrue(!future.isDone());
try {
future.cancel(true);
fail("cancel should fail");
}
catch (UnsupportedOperationException e) {
assertEquals(e.getMessage(), "cancellation is not supported");
}
userPool.free(fakeQueryId, "test", TEN_MEGABYTES.toBytes());
assertTrue(future.isDone());
}
private static void assertNoStateChange(StateMachine<State> stateMachine, StateChanger stateChange)
{
State initialState = stateMachine.get();
ListenableFuture<State> futureChange = stateMachine.getStateChange(initialState);
SettableFuture<State> listenerChange = addTestListener(stateMachine);
// listeners should not be added if we are in a terminal state, but listener should fire
boolean isTerminalState = stateMachine.isTerminalState(initialState);
if (isTerminalState) {
assertEquals(stateMachine.getStateChangeListeners(), ImmutableSet.of());
}
stateChange.run();
assertEquals(stateMachine.get(), initialState);
// the future change will trigger if the state machine is in a terminal state
// this is to prevent waiting for state changes that will never occur
assertEquals(futureChange.isDone(), isTerminalState);
futureChange.cancel(true);
// test listener future only completes if the state actually changed
assertFalse(listenerChange.isDone());
listenerChange.cancel(true);
}
@Test
public void releaseOnCancellation() {
// Setup server
startServer((req, observer) -> {
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
observer.onNext("delayed_response");
observer.onCompleted();
});
ListenableFuture<String> future = ClientCalls.futureUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT), "foo");
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
future.cancel(true);
// Verify
Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class));
Mockito.verify(listener.getResult().get(), Mockito.times(0)).onIgnore();
Mockito.verify(listener.getResult().get(), Mockito.timeout(2000).times(1)).onSuccess();
verifyCounts(0, 0, 1, 0);
}
public List<PartETag> waitForAllPartUploads() throws IOException {
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload:" + ie, ie);
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
//there is no way of recovering so abort
//cancel all partUploads
for (ListenableFuture<PartETag> future : partETagsFutures) {
future.cancel(true);
}
//abort multipartupload
this.abort();
throw new IOException("Part upload failed in multi-part upload with " +
"id '" +uploadId + "':" + ee, ee);
}
//should not happen?
return null;
}
private <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> listenableFuture, Executor executor) {
CompletableFuture<T> completableFuture = new CompletableFuture<T>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
listenableFuture.cancel(mayInterruptIfRunning);
return super.cancel(mayInterruptIfRunning);
}
};
Futures.addCallback(listenableFuture, new FutureCallback<T>() {
@Override
public void onSuccess(@Nullable T result) {
completableFuture.complete(result);
}
@Override
public void onFailure(Throwable t) {
completableFuture.completeExceptionally(t);
}
}, executor);
return completableFuture;
}
@Test(expectedExceptions = ExecutionException.class)
public void shouldAssertAsyncHttpStatusCodeContinuallyEqualsFails() throws Exception {
stopServer();
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
final ListenableFuture<?> future =
HttpAsserts.assertAsyncHttpStatusCodeContinuallyEquals(listeningExecutor, testUri("/missing"), 200);
startAfter(DELAY_FOR_SERVER_TO_SETTLE.add(Duration.seconds(1)));
Time.sleep(DELAY_FOR_SERVER_TO_SETTLE);
if (future.isDone()) {
Object result = future.get(); // should throw exception
LOG.warn("Should have failed, instead gave "+result+" (accessing "+server+")");
} else {
LOG.warn("Future should have been done");
}
future.cancel(true);
}
/**
* Block awaiting all outstanding uploads to complete.
*
* @return list of results
* @throws IOException IO Problems
*/
private List<PartETag> waitForAllPartUploads() throws IOException {
LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload", ie);
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException ee) {
// there is no way of recovering so abort
// cancel all partUploads
LOG.debug("While waiting for upload completion", ee);
LOG.debug("Cancelling futures");
for (ListenableFuture<PartETag> future : partETagsFutures) {
future.cancel(true);
}
// abort multipartupload
abort();
throw extractException("Multi-part upload with id '" + uploadId + "' to " + key, key, ee);
}
}
/**
* convert ListenableFuture of Type S to CompletableFuture of Type T.
*/
static <S, T> CompletableFuture<T> toCompletableFuture(ListenableFuture<S> sourceFuture, Function<S, T> resultConvert,
Executor executor) {
CompletableFuture<T> targetFuture = new CompletableFuture<T>() {
// the cancel of targetFuture also cancels the sourceFuture.
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
return sourceFuture.cancel(mayInterruptIfRunning);
}
};
sourceFuture.addListener(() -> {
try {
targetFuture.complete(resultConvert.apply(sourceFuture.get()));
} catch (Exception e) {
targetFuture.completeExceptionally(toEtcdException(e));
}
}, executor);
return targetFuture;
}
@Override
public void run() {
// If either of these reads return null then we must be after a
// successful cancel or another call to this method.
TimeoutFuture<V> timeoutFuture = timeoutFutureRef;
if (timeoutFuture == null) {
return;
}
ListenableFuture<V> delegate = timeoutFuture.delegateRef;
if (delegate == null) {
return;
}
/*
* If we're about to complete the TimeoutFuture, we want to release our
* reference to it. Otherwise, we'll pin it (and its result) in memory
* until the timeout task is GCed. (The need to clear our reference to
* the TimeoutFuture is the reason we use a *static* nested class with
* a manual reference back to the "containing" class.)
*
* This has the nice-ish side effect of limiting reentrancy: run() calls
* timeoutFuture.setException() calls run(). That reentrancy would
* already be harmless, since timeoutFuture can be set (and delegate
* cancelled) only once. (And "set only once" is important for other
* reasons: run() can still be invoked concurrently in different threads,
* even with the above null checks.)
*/
timeoutFutureRef = null;
if (delegate.isDone()) {
timeoutFuture.setFuture(delegate);
} else {
try {
timeoutFuture.setException(
new TimeoutException("Future timed out: " + delegate));
} finally {
delegate.cancel(true);
}
}
}
@Test
public void unaryFutureCallCancelled() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
final AtomicReference<String> cancelMessage = new AtomicReference<String>();
final AtomicReference<Throwable> cancelCause = new AtomicReference<Throwable>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void cancel(String message, Throwable cause) {
cancelMessage.set(message);
cancelCause.set(cause);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
future.cancel(true);
assertEquals("GrpcFuture was cancelled", cancelMessage.get());
assertNull(cancelCause.get());
listener.get().onMessage("bar");
listener.get().onClose(Status.OK, new Metadata());
try {
future.get();
fail("Should fail");
} catch (CancellationException e) {
// Exepcted
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
for(ListenableFuture<?> l: this.delegate.values()) {
l.cancel(mayInterruptIfRunning);
}
return this.results.cancel(mayInterruptIfRunning);
}
/**
* Asynchronous hello
*
* @param user user name
* @param depth depth of nested calls, {@literal 0} for simple calls, use positive value for nesting
* @return an hello statement
*/
public final Future<String> saysHelloAsync(String user, int depth) {
Req request = buildRequest(user, depth);
ListenableFuture<Rep> future = executeAsync(request);
return new Future<>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
@Override
public boolean isDone() {
return future.isDone();
}
@Override
public String get() throws InterruptedException, ExecutionException {
// TODO : check if something is thrown when there is a server error
return getResponseMessage(future.get());
}
@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// TODO : check if something is thrown when there is a server error
return getResponseMessage(future.get(timeout, unit));
}
};
}
@Test
public void backwardsCancelShouldWork() {
CompletableFuture<String> cf = new CompletableFuture<>();
ListenableFuture<String> lf = MoreFutures.fromCompletableFuture(cf);
lf.cancel(true);
assertThat(cf.isDone()).isTrue();
assertThat(cf.isCancelled()).isTrue();
}
@Test
public void shouldAssertAsyncHttpStatusCodeContinuallyEquals() throws Exception {
stopServer();
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
final ListenableFuture<?> future =
HttpAsserts.assertAsyncHttpStatusCodeContinuallyEquals(listeningExecutor, simpleEndpoint, 200);
startAfter(DELAY_FOR_SERVER_TO_SETTLE.add(Duration.seconds(1)));
if (future.isDone()) {
future.get(); // should not throw exception
}
future.cancel(true);
}
private void cancelProcessing(BlockingQueue<FileSystemAccessRequest> requestQueue, Queue<ParallelBuildContext> afterGenerateQueue, ListenableFuture<?> generatorResult) {
// make sure waiting put on the queue are processed by freeing space in the queue
requestQueue.clear();
// stop processing of resources immediately
generatorResult.cancel(true);
for (ParallelBuildContext context : afterGenerateQueue) {
try {
getGenerator2().afterGenerate(context.resource, context.synchronousFileSystemAccess, context.getGeneratorContext());
} catch (Exception e) {
logger.error("Error running afterGenerate hook", e);
}
}
}
@Test public void createFutureProduced_cancelPropagatesBackwards() throws Exception {
ListenableFuture<String> future = SettableFuture.create();
ListenableFuture<Produced<String>> producedFuture = Producers.createFutureProduced(future);
assertThat(producedFuture.isDone()).isFalse();
producedFuture.cancel(false);
assertThat(future.isCancelled()).isTrue();
}
@Test public void createFutureProduced_cancelDoesNotPropagateForwards() throws Exception {
ListenableFuture<String> future = SettableFuture.create();
ListenableFuture<Produced<String>> producedFuture = Producers.createFutureProduced(future);
assertThat(producedFuture.isDone()).isFalse();
future.cancel(false);
assertThat(producedFuture.isCancelled()).isFalse();
assertThat(getProducedException(producedFuture.get()).getCause())
.isInstanceOf(CancellationException.class);
}
/**
* {@inheritDoc}
* <p>
* <p>If a cancellation attempt succeeds on a {@code Future} that had
* previously been {@linkplain#setFuture set asynchronously}, then the
* cancellation will also be propagated to the delegate {@code Future} that
* was supplied in the {@code setFuture} call.
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
Object localValue = value;
boolean rValue = false;
if (localValue == null | localValue instanceof SetFuture) {
// Try to delay allocating the exception. At this point we may still
// lose the CAS, but it is certainly less likely.
Throwable cause =
GENERATE_CANCELLATION_CAUSES
? new CancellationException("Future.cancel() was called.")
: null;
Object valueToSet = new Cancellation(mayInterruptIfRunning, cause);
AbstractFuture<?> abstractFuture = this;
while (true) {
if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
rValue = true;
// We call interuptTask before calling complete(), which is
// consistent with FutureTask
if (mayInterruptIfRunning) {
abstractFuture.interruptTask();
}
complete(abstractFuture);
if (localValue instanceof SetFuture) {
// propagate cancellation to the future set in setfuture, this is
// racy, and we don't care if we are successful or not.
ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue)
.future;
if (futureToPropagateTo instanceof TrustedFuture) {
// If the future is a TrustedFuture then we specifically avoid
// calling cancel() this has 2 benefits
// 1. for long chains of futures strung together with setFuture
// we consume less stack
// 2. we avoid allocating Cancellation objects at every level of
// the cancellation chain
// We can only do this for TrustedFuture, because
// TrustedFuture.cancel is final and does nothing but delegate
// to this method.
AbstractFuture<?> trusted = (AbstractFuture<?>)
futureToPropagateTo;
localValue = trusted.value;
if (localValue == null | localValue instanceof SetFuture) {
abstractFuture = trusted;
continue; // loop back up and try to complete the new future
}
} else {
// not a TrustedFuture, call cancel directly.
futureToPropagateTo.cancel(mayInterruptIfRunning);
}
}
break;
}
// obj changed, reread
localValue = abstractFuture.value;
if (!(localValue instanceof SetFuture)) {
// obj cannot be null at this point, because value can only change
// from null to non-null. So if value changed (and it did since we
// lost the CAS), then it cannot be null and since it isn't a
// SetFuture, then the future must be done and we should exit the loop
break;
}
}
}
return rValue;
}
/**
* Sets the result of this {@code Future} to match the supplied input
* {@code Future} once the supplied {@code Future} is done, unless this
* {@code Future} has already been cancelled or set (including "set
* asynchronously," defined below).
* <p>
* <p>If the supplied future is {@linkplain #isDone done} when this method
* is called and the call is accepted, then this future is guaranteed to
* have been completed with the supplied future by the time this method
* returns. If the supplied future is not done and the call is accepted, then
* the future will be <i>set asynchronously</i>. Note that such a result,
* though not yet known, cannot be overridden by a call to a {@code set*}
* method, only by a call to {@link #cancel}.
* <p>
* <p>If the call {@code setFuture(delegate)} is accepted and this {@code
* Future} is later cancelled, cancellation will be propagated to {@code
* delegate}. Additionally, any call to {@code setFuture} after any
* cancellation will propagate cancellation to the supplied {@code Future}.
*
* @param future the future to delegate to
* @return true if the attempt was accepted, indicating that the {@code
* Future} was not previously cancelled or set.
* @since 19.0
*/
@Beta
@SuppressWarnings("deadstore")
protected boolean setFuture(ListenableFuture<? extends V> future) {
checkNotNull(future);
Object localValue = value;
if (localValue == null) {
if (future.isDone()) {
Object val = getFutureValue(future);
if (ATOMIC_HELPER.casValue(this, null, val)) {
complete(this);
return true;
}
return false;
}
SetFuture valueToSet = new SetFuture<V>(this, future);
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
// the listener is responsible for calling completeWithFuture,
// directExecutor is appropriate since all we are doing is unpacking
// a completed future which should be fast.
try {
future.addListener(valueToSet, directExecutor());
} catch (Throwable t) {
// addListener has thrown an exception! SetFuture.run can't throw
// any exceptions so this must have been caused by addListener
// itself. The most likely explanation is a misconfigured mock. Try
// to switch to Failure.
Failure failure;
try {
failure = new Failure(t);
} catch (Throwable oomMostLikely) {
failure = Failure.FALLBACK_INSTANCE;
}
// Note: The only way this CAS could fail is if cancel() has raced
// with us. That is ok.
boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure);
}
return true;
}
localValue = value; // we lost the cas, fall through and maybe cancel
}
// The future has already been set to something. If it is cancellation we
// should cancel the incoming future.
if (localValue instanceof Cancellation) {
// we don't care if it fails, this is best-effort.
future.cancel(((Cancellation) localValue).wasInterrupted);
}
return false;
}
@Test
public void verifyInterruptedRequest_followsUpWithEmptyRequest() {
final EspressoRemote espressoRemote = new EspressoRemote(mockedInstrumentation);
espressoRemote.init();
final CountDownLatch espressoRequestLatch = new CountDownLatch(1);
final CountDownLatch emptyRequestLatch = new CountDownLatch(1);
// create a tmp handler to receive a response from the EspressoRemote under test
final HandlerThread handlerThread = new HandlerThread("OtherEspresso");
handlerThread.start();
Handler handler =
new Handler(handlerThread.getLooper()) {
@Override
public void handleMessage(Message msg) {
switch (msg.what) {
case EspressoRemote.MSG_HANDLE_ESPRESSO_REQUEST:
espressoRequestLatch.countDown();
break;
case EspressoRemote.MSG_HANDLE_EMPTY_REQUEST:
emptyRequestLatch.countDown();
break;
default:
super.handleMessage(msg);
}
}
};
// create a tmp messenger to represent "other" remote Espresso
Messenger otherEspressoMessenger = new Messenger(handler);
Set<Messenger> clients = new HashSet<>();
clients.add(otherEspressoMessenger);
clients.add(espressoRemote.incomingHandler.messengerHandler);
when(mockedInstrumentation.getClientsForType(EspressoRemote.TYPE)).thenReturn(clients);
// send out an interaction request to remote espresso
ListenableFuture<Void> future =
remoteExecutor.submit(
espressoRemote.createRemoteCheckCallable(
RootMatchers.DEFAULT, withId(123), null, matches(withText(is("test")))));
try {
// wait until remote Espresso receives an interaction request
assertTrue(espressoRequestLatch.await(200, TimeUnit.MILLISECONDS));
// interrupt the remote interaction request
future.cancel(true);
// ensure extra empty message was sent out to flush out the remote instance handler queue
assertTrue(emptyRequestLatch.await(200, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
fail("Unexpected InterruptedException");
}
// clean up
handlerThread.getLooper().quit();
}