下面列出了java.util.concurrent.FutureTask#run ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
@NotNull
public FutureTask<Boolean> preprocessFile(@NotNull PsiFile file, boolean processChangedTextOnly) throws IncorrectOperationException {
final FutureTask<Boolean> reformatTask = myReformatCodeProcessor.preprocessFile(file, processChangedTextOnly);
final FutureTask<Boolean> optimizeImportsTask = myOptimizeImportsProcessor.preprocessFile(file, false);
return new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
reformatTask.run();
if (!reformatTask.get() || reformatTask.isCancelled()) {
return false;
}
CodeStyleManagerImpl.setSequentialProcessingAllowed(false);
try {
optimizeImportsTask.run();
return optimizeImportsTask.get() && !optimizeImportsTask.isCancelled();
}
finally {
CodeStyleManagerImpl.setSequentialProcessingAllowed(true);
}
}
});
}
/**
* 计算并获取值
*
* @param arg key
* @param c 算子
*
* @return 值
*
* @throws InterruptedException
*/
public V compute(final K arg, final Computable<K, V> c) throws InterruptedException {
Future<V> future = cache.get(arg);
if (future == null) {
FutureTask<V> futureTask = new FutureTask<V>(new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
});
future = cache.putIfAbsent(arg, futureTask);
if (future == null) {
future = futureTask;
futureTask.run();
}
}
try {
return future.get();
} catch (Exception e) {
LOGGER.error("Get computed result with error - " + e.getMessage(), e);
cache.remove(arg);
return null;
}
}
public FutureTask<Boolean> preprocessFile(@Nonnull PsiFile file, boolean processChangedTextOnly) throws IncorrectOperationException {
final FutureTask<Boolean> previousTask = getPreviousProcessorTask(file, processChangedTextOnly);
final FutureTask<Boolean> currentTask = prepareTask(file, processChangedTextOnly);
return new FutureTask<>(() -> {
try {
if (previousTask != null) {
previousTask.run();
if (!previousTask.get() || previousTask.isCancelled()) return false;
}
ApplicationManager.getApplication().runWriteAction(currentTask);
return currentTask.get() && !currentTask.isCancelled();
}
catch (ExecutionException e) {
ExceptionUtil.rethrowUnchecked(e.getCause());
throw e;
}
});
}
private static Bitmap captureViewBasedImpl(@NonNull final View view)
throws InterruptedException, ExecutionException {
checkNotNull(
view,
"Taking view based screenshot requires using either takeScreenshot(view) or"
+ " takeScreenshot(activity) where view and activity are non-null.");
Callable<Bitmap> takeScreenshotCallable = takeScreenshotCallableFactory.create(view);
FutureTask<Bitmap> task = new FutureTask<>(takeScreenshotCallable);
// If we already run on the main thread just execute the task
if (Looper.myLooper() == Looper.getMainLooper()) {
task.run();
} else {
InstrumentationRegistry.getInstrumentation().runOnMainSync(task);
}
return task.get(); // Blocks
}
void checkIsRunning(Future<?> f) {
checkNotDone(f);
if (f instanceof FutureTask) {
FutureTask ft = (FutureTask<?>) f;
// Check that run methods do nothing
ft.run();
if (f instanceof PublicFutureTask) {
PublicFutureTask pf = (PublicFutureTask) f;
int savedRunCount = pf.runCount();
pf.run();
assertFalse(pf.runAndReset());
assertEquals(savedRunCount, pf.runCount());
}
checkNotDone(f);
}
}
/**
* timed get with most negative timeout works correctly (i.e. no
* underflow bug)
*/
public void testGet_NegativeInfinityTimeout() throws Exception {
final ExecutorService pool = Executors.newFixedThreadPool(10);
final Runnable nop = new Runnable() { public void run() {}};
final FutureTask<Void> task = new FutureTask<>(nop, null);
final List<Future<?>> futures = new ArrayList<>();
Runnable r = new Runnable() { public void run() {
for (long timeout : new long[] { 0L, -1L, Long.MIN_VALUE }) {
try {
task.get(timeout, NANOSECONDS);
shouldThrow();
} catch (TimeoutException success) {
} catch (Throwable fail) {threadUnexpectedException(fail);}}}};
for (int i = 0; i < 10; i++)
futures.add(pool.submit(r));
try {
joinPool(pool);
for (Future<?> future : futures)
checkCompletedNormally(future, null);
} finally {
task.run(); // last resort to help terminate
}
}
/**
* Run the supplied FutureTask on the main thread. The method will block only if the current
* thread is the main thread.
*
* @param task The FutureTask to run
* @return The queried task (to aid inline construction)
*/
public static <T> FutureTask<T> runOnUiThread(FutureTask<T> task) {
if (runningOnUiThread()) {
task.run();
} else {
postOnUiThread(task);
}
return task;
}
@Test
public void refreshShouldCancelRunningTaskWhenNewTaskIsSubmitted() throws Exception {
// Create a task that will block until we say to finish.
final FutureTask startedFirstTask = new FutureTask<>(() -> null);
final FutureTask<String> dependency = new FutureTask<>(() -> "first task");
final AtomicReference<Refreshable.Request> firstRequest = new AtomicReference<>();
final Refreshable.Callback<String> firstTask = (request) -> {
firstRequest.set(request);
startedFirstTask.run();
return dependency.get();
};
final Callable<String> secondTask = () -> "second task";
value.refresh(firstTask);
startedFirstTask.get(); // wait for first task to start running.
assertNull("should have blocked on the dependency", value.getNow());
checkLog("BUSY: null");
value.refresh(secondTask);
assertTrue("should have cancelled first task", firstRequest.get().isCancelled());
checkLog();
dependency.run(); // Make first task exit, allowing second to run.
expectUnpublish();
assertEquals("second task", value.getWhenReady());
checkLog("unpublished: first task",
"BUSY: second task",
"IDLE: second task");
}
/**
* Run the supplied FutureTask on the main thread. The method will block
* only if the current thread is the main thread.
*
* @param task The FutureTask to run
* @return The queried task (to aid inline construction)
*/
public static <T> FutureTask<T> runOnUiThread(FutureTask<T> task) {
if (runningOnUiThread()) {
task.run();
} else {
postOnUiThread(task);
}
return task;
}
/** Asynchronously initialize a new cache value to associate with key.
* If key is null or key already exist will do nothing.
* Wraps the create method in a future task and starts a new process.
* @param key associated with cache value */
public void init(K key) {
if (null == key)
return;
CacheReference<K> refKey = keyFactory.createKey(key, queue);
if (cache.containsKey(refKey))
return;
FutureTask<CacheReference<V>> task = new FutureTask<>(()-> {
V created = requireNonNull(create(key));
return valueFactory.createValue(created, queue);
});
cache.put(refKey, task);
task.run();
}
/**
* Run the supplied FutureTask on the main thread. The method will block only if the current
* thread is the main thread.
*
* @param task The FutureTask to run
* @return The queried task (to aid inline construction)
*/
public static <T> FutureTask<T> runOnUiThread(FutureTask<T> task) {
if (runningOnUiThread()) {
task.run();
} else {
postOnUiThread(task);
}
return task;
}
/**
* Check that we do not stop responding when trying to read after adding null readers.
*/
@Test
@Category(value = ExcludeFromGatedCheckin.class)
public final void testReadFromNullReader() throws SQLException {
// The code below exposes a flaw in our current implementation related to
// CompleteResults semantics and the internal c-tor. The flaw does not
// leak out to customers because the MultiShardStatement object manages the
// necessary logic, but we need to patch the flaw so it doesn't end up
// inadvertently leaking out to customers.
// See VSTS 2616238 (i believe). Philip will be modofying logic and
// augmenting tests to deal with this issue.
// Pass a null reader and verify that read goes through and does not terminate.
LabeledResultSet[] readers = new LabeledResultSet[2];
readers[0] = getReader(conn1, "select 1", "Test0");
readers[1] = null;
try (MultiShardResultSet sdr = new MultiShardResultSet(Arrays.asList(readers))) {
FutureTask task = new FutureTask<>(() -> {
int count = 0;
while (sdr.next()) {
count++;
}
return count;
});
task.run();
Thread.sleep(500);
Assert.assertTrue("Read did not respond on the garbage reader.", task.isDone());
}
catch (SQLException | InterruptedException e) {
Assert.fail(e.getMessage());
}
}
private <T> FutureTask<T> runOnMainThread(final FutureTask<T> futureToRun) {
if (Looper.myLooper() != Looper.getMainLooper()) {
final CountDownLatch latch = new CountDownLatch(1);
mainHandler.post(
new Runnable() {
@Override
public void run() {
try {
futureToRun.run();
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException ie) {
if (!futureToRun.isDone()) {
throw new RuntimeException("Interrupted while waiting for task to complete.", ie);
}
}
} else {
futureToRun.run();
}
return futureToRun;
}
@Test
public void refreshShouldCancelRunningTaskWhenNewTaskIsSubmitted() throws Exception {
// Create a task that will block until we say to finish.
final FutureTask startedFirstTask = new FutureTask<>(() -> null);
final FutureTask<String> dependency = new FutureTask<>(() -> "first task");
final AtomicReference<Refreshable.Request> firstRequest = new AtomicReference<>();
final Refreshable.Callback<String> firstTask = (request) -> {
firstRequest.set(request);
startedFirstTask.run();
return dependency.get();
};
final Callable<String> secondTask = () -> "second task";
value.refresh(firstTask);
startedFirstTask.get(); // wait for first task to start running.
assertNull("should have blocked on the dependency", value.getNow());
checkLog("BUSY: null");
value.refresh(secondTask);
assertTrue("should have cancelled first task", firstRequest.get().isCancelled());
checkLog();
dependency.run(); // Make first task exit, allowing second to run.
expectUnpublish();
assertEquals("second task", value.getWhenReady());
checkLog("unpublished: first task",
"BUSY: second task",
"IDLE: second task");
}
public V compute(final K key) throws InterruptedException {
while (true) {
Future<V> future = cache.get(key);
if (future == null) {
final Callable<V> eval = new Callable<V>() {
public V call() throws Exception {
return c.compute(key);
}
};
final FutureTask<V> futureTask = new FutureTask<>(eval);
future = cache.putIfAbsent(key, futureTask);
if (future == null) {
future = futureTask;
futureTask.run();
}
}
try {
return future.get();
} catch (final ExecutionException e) {
if (e.getCause() != null && NoClassDefFoundError.class.isInstance(e.getCause())) {
return null;
}
e.printStackTrace();
}
}
}
/**
* Unit test for distinct.
*
* @throws ExecutionException
* @throws InterruptedException
*/
@SuppressWarnings("rawtypes")
public void test_distinctBindingSets() throws InterruptedException,
ExecutionException {
final UUID queryId = UUID.randomUUID();
final Var<?> x = Var.var("x");
// final Var<?> y = Var.var("y");
final IVariable<?>[] vars = new IVariable[]{x};
final int distinctId = 1;
final PipelineOp query = newDistinctBindingSetsOp(new BOp[]{},
new NV(HTreeDistinctBindingSetsOp.Annotations.BOP_ID,distinctId),//
new NV(HTreeDistinctBindingSetsOp.Annotations.VARIABLES,vars),//
new NV(HTreeDistinctBindingSetsOp.Annotations.NAMED_SET_REF,
NamedSolutionSetRefUtility.newInstance(queryId, getName(), vars)),//
new NV(PipelineOp.Annotations.EVALUATION_CONTEXT,
BOpEvaluationContext.CONTROLLER),//
new NV(PipelineOp.Annotations.SHARED_STATE, true),//
new NV(PipelineOp.Annotations.MAX_PARALLEL, 1),//
new NV(IPredicate.Annotations.RELATION_NAME, new String[]{"dummy"})
);
// the expected solutions
final IBindingSet[] expected = new IBindingSet[] {//
new ListBindingSet(//
new IVariable[] { x },//
new IConstant[] { new Constant<IV>(setup.john) }//
),//
new ListBindingSet(//
new IVariable[] { x },//
new IConstant[] { new Constant<IV>(setup.mary) }//
), new ListBindingSet(//
new IVariable[] { x },//
new IConstant[] { new Constant<IV>(setup.paul) }//
), new ListBindingSet(//
new IVariable[] { x },//
new IConstant[] { new Constant<IV>(setup.leon) }//
), };
final MockQueryContext queryContext = new MockQueryContext(queryId);
try {
final BOpStats stats = query.newStats();
final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>(
new IBindingSet[][] { setup.data.toArray(new IBindingSet[0]) });
final IBlockingBuffer<IBindingSet[]> sink = new BlockingBufferWithStats<IBindingSet[]>(
query, stats);
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
new MockRunningQuery(null/* fed */, null/* indexManager */,
queryContext), -1/* partitionId */, stats,
query/* op */, false/* lastInvocation */, source, sink,
null/* sink2 */);
// get task.
final FutureTask<Void> ft = query.eval(context);
// execute task.
// jnl.getExecutorService().execute(ft);
ft.run();
AbstractQueryEngineTestCase.assertSameSolutionsAnyOrder("", expected,
sink.iterator(), ft);
// assertTrue(ft.isDone());
// assertFalse(ft.isCancelled());
// ft.get(); // verify nothing thrown.
assertEquals(1L, stats.chunksIn.get());
assertEquals(6L, stats.unitsIn.get());
assertEquals(4L, stats.unitsOut.get());
assertEquals(1L, stats.chunksOut.get());
} finally {
queryContext.close();
}
}
/**
* Unit test where the offset plus the limit is never satisfied. For this
* test, all binding sets will be consumed and some will be emitted, but the
* slice is never satisfied.
*
* @throws InterruptedException
* @throws ExecutionException
*/
public void test_slice_offsetPlusLimitNeverSatisfied() throws InterruptedException,
ExecutionException {
final Var<?> x = Var.var("x");
final Var<?> y = Var.var("y");
final int bopId = 1;
final long offset = 2L;
final long limit = 10L;
final SliceOp query = new SliceOp(new BOp[] {}, NV.asMap(new NV[] {//
new NV(SliceOp.Annotations.BOP_ID, bopId),//
new NV(SliceOp.Annotations.OFFSET, offset),//
new NV(SliceOp.Annotations.LIMIT, limit),//
new NV(SliceOp.Annotations.EVALUATION_CONTEXT,
BOpEvaluationContext.CONTROLLER),//
new NV(PipelineOp.Annotations.SHARED_STATE,true),//
new NV(PipelineOp.Annotations.REORDER_SOLUTIONS,false),//
}));
assertEquals("offset", offset, query.getOffset());
assertEquals("limit", limit, query.getLimit());
// the expected solutions
final IBindingSet[] expected = new IBindingSet[] {//
new ListBindingSet(//
new IVariable[] { x, y },//
new IConstant[] { new Constant<String>("Mary"),
new Constant<String>("Jane") }//
),//
new ListBindingSet(//
new IVariable[] { x, y },//
new IConstant[] { new Constant<String>("Paul"),
new Constant<String>("Leon") }//
),//
new ListBindingSet(//
new IVariable[] { x, y },//
new IConstant[] { new Constant<String>("Paul"),
new Constant<String>("John") }//
),//
new ListBindingSet(//
new IVariable[] { x, y },//
new IConstant[] { new Constant<String>("Leon"),
new Constant<String>("Paul") }//
),//
};
final SliceStats stats = query.newStats();
final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>(
new IBindingSet[][] { data.toArray(new IBindingSet[0]) });
final IBlockingBuffer<IBindingSet[]> sink = new BlockingBufferWithStats<IBindingSet[]>(query, stats);
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
new MockRunningQuery(null/* fed */, null/* indexManager */
, sink), -1/* partitionId */, stats, query/* op */,
false/* lastInvocation */, source, sink, null/* sink2 */);
// get task.
final FutureTask<Void> ft = query.eval(context);
ft.run();
/*
* Note: When the slice does not have a limit (or if we write a test
* where the #of source binding sets can not satisfy the offset and/or
* limit) then the sink WILL NOT be closed by the slice. Therefore, in
* order for the iterator to terminate we first check the Future of the
* SliceTask and then _close_ the sink before consuming the iterator.
*/
assertTrue(ft.isDone());
assertFalse(ft.isCancelled());
ft.get(); // verify nothing thrown.
sink.close(); // close the sink so the iterator will terminate!
AbstractQueryEngineTestCase.assertSameSolutions(expected, sink.iterator());
assertEquals(1L, stats.chunksIn.get());
assertEquals(6L, stats.unitsIn.get());
assertEquals(4L, stats.unitsOut.get());
assertEquals(1L, stats.chunksOut.get());
assertEquals(6L, stats.nseen.get());
assertEquals(4L, stats.naccepted.get());
}
/**
* Unit test for correct visitation for a variety of offset/limit values.
*
* @throws ExecutionException
* @throws InterruptedException
*/
public void test_slice_offset1_limit3() throws InterruptedException,
ExecutionException {
final Var<?> x = Var.var("x");
final Var<?> y = Var.var("y");
final int bopId = 1;
final long offset = 1;
final long limit = 3;
final SliceOp query = new SliceOp(new BOp[]{},
NV.asMap(new NV[]{//
new NV(SliceOp.Annotations.BOP_ID, bopId),//
new NV(SliceOp.Annotations.OFFSET, offset),//
new NV(SliceOp.Annotations.LIMIT, limit),//
new NV(SliceOp.Annotations.EVALUATION_CONTEXT,
BOpEvaluationContext.CONTROLLER),//
new NV(PipelineOp.Annotations.SHARED_STATE,true),//
new NV(PipelineOp.Annotations.REORDER_SOLUTIONS,false),//
}));
assertEquals("offset", offset, query.getOffset());
assertEquals("limit", limit, query.getLimit());
// the expected solutions
final IBindingSet[] expected = new IBindingSet[] {//
new ListBindingSet(//
new IVariable[] { x, y },//
new IConstant[] { new Constant<String>("Mary"),
new Constant<String>("Paul"), }//
),
new ListBindingSet(//
new IVariable[] { x, y },//
new IConstant[] { new Constant<String>("Mary"),
new Constant<String>("Jane") }//
),
new ListBindingSet(//
new IVariable[] { x, y },//
new IConstant[] { new Constant<String>("Paul"),
new Constant<String>("Leon") }//
), };
final SliceStats stats = query.newStats();
final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>(
new IBindingSet[][] { data.toArray(new IBindingSet[0]) });
final IBlockingBuffer<IBindingSet[]> sink = new BlockingBufferWithStats<IBindingSet[]>(query, stats);
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
new MockRunningQuery(null/* fed */, null/* indexManager */
, sink), -1/* partitionId */, stats, query/* op */,
false/* lastInvocation */, source, sink, null/* sink2 */);
// get task.
final FutureTask<Void> ft = query.eval(context);
ft.run();
AbstractQueryEngineTestCase.assertSameSolutions(expected, sink.iterator());
assertTrue(ft.isDone());
assertFalse(ft.isCancelled());
ft.get(); // verify nothing thrown.
assertEquals(limit, stats.naccepted.get());
assertEquals(offset+limit, stats.nseen.get());
assertEquals(1L, stats.chunksIn.get());
assertEquals(4L, stats.unitsIn.get());
assertEquals(3L, stats.unitsOut.get());
assertEquals(1L, stats.chunksOut.get());
}
static <V> void run(FutureTask<V> task) {
boolean isCancelled = task.isCancelled();
task.run();
check(task.isDone());
equal(isCancelled, task.isCancelled());
}
@Override
protected void doApply(final RemoteRepositoryManager rmgr,
final UUID uuid) throws Exception {
// Note: Do NOT assign the namespace until the task executes!!!
final String namespace = "n"
+ sharedTestState.namespaceCreateCounter.incrementAndGet();
// Wrap as FutureTask.
final FutureTask<Void> ft = new FutureTask<Void>(
new Callable<Void>() {
@Override
public Void call() throws Exception {
// Note: Wrap properties to avoid modification!
final Properties properties = new Properties(
sharedTestState.testMode.getProperties());
// create namespace.
rmgr.createRepository(namespace, properties);
// add entry IFF created.
if (sharedTestState.namespaces.putIfAbsent(namespace,
new ReentrantReadWriteLock()) != null) {
// Should not exist! Each namespace name is distinct!!!
throw new AssertionError("namespace=" + namespace);
}
// Track #of namespaces that exist in the service.
sharedTestState.namespaceExistCounter.incrementAndGet();
return null;
}
});
begin(namespace, uuid, ft);
ft.run(); // run in our thread.
try {
ft.get(); // check future.
} finally {
done(namespace, uuid);
}
}