下面列出了怎么用com.google.common.util.concurrent.AbstractFuture的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testCall_interruptedExceptionPassthrough()
throws CommandExecutionException, CommandExitException, ExecutionException,
InterruptedException, IOException {
AbstractFuture<String> future =
new AbstractFuture<String>() {
@Override
public String get() throws InterruptedException {
throw new InterruptedException();
}
};
Mockito.when(mockStdoutSaver.getResult()).thenReturn(future);
try {
testCommandCaller.call(fakeCommand, fakeWorkingDirectory, fakeEnvironment);
Assert.fail("InterruptedException expected but not found.");
} catch (InterruptedException ex) {
// pass
}
verifyCommandExecution();
}
/**
* @param task any exception throwing would cancel the task. user should swallow exceptions by self.
* @param executor all task would be stopped after executor has been marked shutting down.
* @return a future that can cancel the task.
*/
public static Future<?> scheduleWithDynamicDelay(@Nonnull ScheduledExecutorService executor,
@Nullable Duration initDelay, @Nonnull Scheduled task) {
checkNotNull(executor);
checkNotNull(task);
AtomicBoolean canceled = new AtomicBoolean(false);
AbstractFuture<?> future = new AbstractFuture<Object>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
canceled.set(true);
return super.cancel(mayInterruptIfRunning);
}
};
executor.schedule(new ScheduledTaskImpl(executor, task, canceled),
initDelay == null ? 0 : initDelay.toMillis(), MILLISECONDS);
return future;
}
/**
* Immediately subscribes to the {@link Observable} and returns a future that will contain the only one value T passed in to the
* {@link Observer#onNext(Object)}. If more than one value is received then an {@link Observer#onError(Throwable)} is invoked.
* <p>
* If the source {@link Observable} emits more than one item or no items, notify of an IllegalArgumentException or NoSuchElementException respectively.
*
* @param observable The source {@link Observable} for the value.
* @return a {@link ListenableFuture} that sets the value on completion.
*/
public static <T> ListenableFuture<T> to(final Observable<T> observable) {
class ListenFutureSubscriberAdaptor extends AbstractFuture<T> {
final Subscriber<? super T> subscriber;
private ListenFutureSubscriberAdaptor() {
subscriber = new Subscriber<T>() {
private T value;
@Override
public void onCompleted() {
set(value);
}
@Override
public void onError(Throwable e) {
setException(e);
}
@Override
public void onNext(T t) {
// wait for the onCompleted to make sure the observable on emits one value.
value = t;
}
};
}
@Override
protected void interruptTask() {
subscriber.unsubscribe();
}
}
ListenFutureSubscriberAdaptor future = new ListenFutureSubscriberAdaptor();
// Futures are hot so subscribe immediately
observable.single().subscribe(future.subscriber);
return future;
}