下面列出了怎么用java.util.concurrent.RunnableFuture的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testDismissingSnapshotNotRunnable() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
snapshot.cancel(true);
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
try {
snapshot.get();
fail();
} catch (Exception ignored) {
}
asyncSnapshotThread.join();
verifyRocksObjectsReleased();
} finally {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
}
@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
long syncStartTime = System.currentTimeMillis();
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
return snapshotRunner;
}
@Nonnull
@Override
public final RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
if (kvStateInformation.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
timestamp);
}
return DoneFuture.of(SnapshotResult.empty());
} else {
return doSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
}
}
@Override
public java.util.concurrent.Future<ProjectProblemsProvider.Result> resolve() {
ProjectProblemsProvider.Result res;
if (action != null) {
action.actionPerformed(null);
String text = (String) action.getValue(ACT_START_MESSAGE);
if (text != null) {
res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED, text);
} else {
res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED);
}
} else {
res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.UNRESOLVED, "No resolution for the problem");
}
RunnableFuture<ProjectProblemsProvider.Result> f = new FutureTask<>(new Runnable() {
@Override
public void run() {
}
}, res);
f.run();
return f;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
if (r instanceof RunnableFuture) {
throw new RejectedExecutionException(e);
}
ObsException obsException = new ObsException(e.getMessage(), e);
if (r instanceof RestoreObjectTask) {
RestoreObjectTask task = (RestoreObjectTask) r;
task.getProgressStatus().failTaskIncrement();
TaskCallback<RestoreObjectResult, RestoreObjectRequest> callback = task.getCallback();
callback.onException(obsException, task.getTaskRequest());
}
}
}
public static PsiDirectory writeDirectory(PsiDirectory dir, DirectoryWrapper dirWrapper, Project project) {
if (dir == null) {
//todo print error
return null;
}
RunnableFuture<PsiDirectory> runnableFuture = new FutureTask<>(() ->
ApplicationManager.getApplication().runWriteAction(new Computable<PsiDirectory>() {
@Override
public PsiDirectory compute() {
return writeDirectoryAction(dir, dirWrapper, project);
}
}));
ApplicationManager.getApplication().invokeLater(runnableFuture);
try {
return runnableFuture.get();
} catch (InterruptedException | ExecutionException e) {
Logger.log("runnableFuture " + e.getMessage());
Logger.printStack(e);
}
return null;
}
@Nonnull
@Override
@SuppressWarnings("unchecked")
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
@Nonnull final CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws IOException {
long startTime = System.currentTimeMillis();
final RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner =
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
snapshotStrategy.logSyncCompleted(streamFactory, startTime);
return snapshotRunner;
}
@Override
public void stopImmediately()
{
if (_running.compareAndSet(true,false))
{
ExecutorService executor = _executor;
if (executor != null)
{
LOGGER.debug("Stopping task executor {} immediately", _name);
List<Runnable> cancelledTasks = executor.shutdownNow();
for (Runnable runnable : cancelledTasks)
{
if (runnable instanceof RunnableFuture<?>)
{
((RunnableFuture<?>) runnable).cancel(true);
}
}
_executor = null;
_taskThread = null;
LOGGER.debug("Task executor was stopped immediately. Number of unfinished tasks: " + cancelledTasks.size());
}
}
}
@Test
public void testSnapshotEmpty() throws Exception {
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
final OperatorStateBackend operatorStateBackend =
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator", emptyStateHandles, cancelStreamRegistry);
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
assertNull(stateHandle);
}
Context(Document doc, LayerHandle layer, FileObject file, RunnableFuture<Map<String,Integer>> lines, List<? super ErrorDescription> errors) {
this.doc = doc;
this.layer = layer;
this.file = file;
this.lines = lines;
this.errors = errors;
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (!(runnable instanceof PrioritizedRunnable)) {
runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
}
return new PrioritizedFutureTask<>((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet());
}
@Test
public void getRunnableResultTest() throws InterruptedException, ExecutionException {
ExecuteOnGetFutureFactory ff = makeFutureFactory();
final Object result = new Object();
RunnableFuture<Object> future = ff.make(DoNothingRunnable.instance(), result);
future.run();
assertTrue(future.get() == result);
}
void testNonConcurrentSnapshotTransformerAccess() throws Exception {
List<TestState> testStates = Arrays.asList(
new TestValueState(),
new TestListState(),
new TestMapState()
);
for (TestState state : testStates) {
for (int i = 0; i < 100; i++) {
backend.setCurrentKey(i);
state.setToRandomValue();
}
CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot1 =
backend.snapshot(1L, 0L, streamFactory, checkpointOptions);
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot2 =
backend.snapshot(2L, 0L, streamFactory, checkpointOptions);
Thread runner1 = new Thread(snapshot1, "snapshot1");
runner1.start();
Thread runner2 = new Thread(snapshot2, "snapshot2");
runner2.start();
runner1.join();
runner2.join();
snapshot1.get();
snapshot2.get();
}
}
private KeyedStateHandle runSnapshot(
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture,
SharedStateRegistry sharedStateRegistry) throws Exception {
if (!snapshotRunnableFuture.isDone()) {
snapshotRunnableFuture.run();
}
SnapshotResult<KeyedStateHandle> snapshotResult = snapshotRunnableFuture.get();
KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
if (jobManagerOwnedSnapshot != null) {
jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
}
return jobManagerOwnedSnapshot;
}
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws Exception {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
true,
stateHandles,
cancelStreamRegistry) {
@Override
@SuppressWarnings("unchecked")
public DefaultOperatorStateBackend build() {
return new DefaultOperatorStateBackend(
executionConfig,
cancelStreamRegistry,
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
mock(AbstractSnapshotStrategy.class)
) {
@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
return new FutureTask<>(() -> {
throw new Exception("Async part snapshot exception.");
});
}
};
}
}.build();
}
@Test
public void testCompletingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
waiter.reset();
runStateUpdates();
blocker.trigger(); // allow checkpointing to start writing
waiter.await(); // wait for snapshot stream writing to run
SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get();
KeyedStateHandle keyedStateHandle = snapshotResult.getJobManagerOwnedSnapshot();
assertNotNull(keyedStateHandle);
assertTrue(keyedStateHandle.getStateSize() > 0);
assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
for (BlockingCheckpointOutputStream stream : testStreamFactory.getAllCreatedStreams()) {
assertTrue(stream.isClosed());
}
asyncSnapshotThread.join();
verifyRocksObjectsReleased();
} finally {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
}
@Test
public void testCancelRunningSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
waiter.reset();
runStateUpdates();
snapshot.cancel(true);
blocker.trigger(); // allow checkpointing to start writing
for (BlockingCheckpointOutputStream stream : testStreamFactory.getAllCreatedStreams()) {
assertTrue(stream.isClosed());
}
waiter.await(); // wait for snapshot stream writing to run
try {
snapshot.get();
fail();
} catch (Exception ignored) {
}
asyncSnapshotThread.join();
verifyRocksObjectsReleased();
} finally {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof MergeTask) {
return (RunnableFuture<T>) new MainMergeFuture((MergeTask) callable);
} else {
return (RunnableFuture<T>) new SubMergeFuture((MergeChunkHeapTask) callable);
}
}
/**
* Run the given {@code RunnableFuture} if it is not done, and then retrieves its result.
* @param future to run if not done and get
* @param <T> type of the result
* @return the result after running the future
* @throws ExecutionException if a problem occurred
* @throws InterruptedException if the current thread has been interrupted
*/
public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {
if (null == future) {
return null;
}
if (!future.isDone()) {
future.run();
}
return future.get();
}
protected final void notifyPostExecute(RunnableFuture<?> future, Throwable t) {
if(this.listeners.size() == 0) return;
for(final Listener listener : this.listeners) {
try {
listener.postExecute(this, future, t);
} catch(Throwable tx) {
LOGGER.error("["+ this.getName() + "]failed to notify postExecute, listener: " + listener, tx);
}
}
}
/**
* Run the given {@code RunnableFuture} if it is not done, and then retrieves its result.
* @param future to run if not done and get
* @param <T> type of the result
* @return the result after running the future
* @throws ExecutionException if a problem occurred
* @throws InterruptedException if the current thread has been interrupted
*/
public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {
if (null == future) {
return null;
}
if (!future.isDone()) {
future.run();
}
return future.get();
}
@Nonnull
public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture() throws IOException {
if (null == keyedStateCheckpointClosingFuture) {
StreamCloserCallable<KeyGroupsStateHandle> callable = new StreamCloserCallable<>(closableRegistry, keyedStateCheckpointOutputStream);
AsyncSnapshotTask asyncSnapshotTask = callable.toAsyncSnapshotFutureTask(closableRegistry);
keyedStateCheckpointClosingFuture = castAsKeyedStateHandle(asyncSnapshotTask);
}
return keyedStateCheckpointClosingFuture;
}
protected final void notifyPrevExecute(RunnableFuture<?> future) {
if(this.listeners.size() == 0) return;
for(final Listener listener : this.listeners) {
try {
listener.prevExecute(this, future);
} catch(Throwable tx) {
LOGGER.error("["+ this.getName() + "]failed to notify prevExecute, listener: " + listener, tx);
}
}
}
void testNonConcurrentSnapshotTransformerAccess() throws Exception {
List<TestState> testStates = Arrays.asList(
new TestValueState(),
new TestListState(),
new TestMapState()
);
for (TestState state : testStates) {
for (int i = 0; i < 100; i++) {
backend.setCurrentKey(i);
state.setToRandomValue();
}
CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot1 =
backend.snapshot(1L, 0L, streamFactory, checkpointOptions);
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot2 =
backend.snapshot(2L, 0L, streamFactory, checkpointOptions);
Thread runner1 = new Thread(snapshot1, "snapshot1");
runner1.start();
Thread runner2 = new Thread(snapshot2, "snapshot2");
runner2.start();
runner1.join();
runner2.join();
snapshot1.get();
snapshot2.get();
}
}
private KeyedStateHandle runSnapshot(
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture,
SharedStateRegistry sharedStateRegistry) throws Exception {
if (!snapshotRunnableFuture.isDone()) {
snapshotRunnableFuture.run();
}
SnapshotResult<KeyedStateHandle> snapshotResult = snapshotRunnableFuture.get();
KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
if (jobManagerOwnedSnapshot != null) {
jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
}
return jobManagerOwnedSnapshot;
}
private OperatorStateHandle runSnapshot(
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunnableFuture) throws Exception {
if (!snapshotRunnableFuture.isDone()) {
snapshotRunnableFuture.run();
}
return snapshotRunnableFuture.get().getJobManagerOwnedSnapshot();
}
public OperatorSnapshotFutures(
@Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture,
@Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture,
@Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture,
@Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture) {
this.keyedStateManagedFuture = keyedStateManagedFuture;
this.keyedStateRawFuture = keyedStateRawFuture;
this.operatorStateManagedFuture = operatorStateManagedFuture;
this.operatorStateRawFuture = operatorStateRawFuture;
}
protected KeyedStateHandle runSnapshot(
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture,
SharedStateRegistry sharedStateRegistry) throws Exception {
if (!snapshotRunnableFuture.isDone()) {
snapshotRunnableFuture.run();
}
SnapshotResult<KeyedStateHandle> snapshotResult = snapshotRunnableFuture.get();
KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
if (jobManagerOwnedSnapshot != null) {
jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
}
return jobManagerOwnedSnapshot;
}
@Test
public void testDismissingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
snapshot.cancel(true);
verifyRocksObjectsReleased();
} finally {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
}
private OperatorStateHandle runSnapshot(
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunnableFuture) throws Exception {
if (!snapshotRunnableFuture.isDone()) {
snapshotRunnableFuture.run();
}
return snapshotRunnableFuture.get().getJobManagerOwnedSnapshot();
}