下面列出了怎么用java.util.concurrent.FutureTask的API类实例代码及写法,或者点击链接到github查看源代码。
public void setDelegator(Delegator delegator) {
this.delegator = delegator;
this.delegatorName = delegator.getDelegatorName();
this.entityEcaReaderName = EntityEcaUtil.getEntityEcaReaderName(delegator.getDelegatorBaseName());
Callable<DispatchContext> creator = new Callable<DispatchContext>() {
public DispatchContext call() {
return EntityServiceFactory.getDispatchContext(DelegatorEcaHandler.this.delegator);
}
};
FutureTask<DispatchContext> futureTask = new FutureTask<DispatchContext>(creator);
if (this.dctx.compareAndSet(null, futureTask)) {
ExecutionPool.GLOBAL_BATCH.submit(futureTask);
}
//preload the cache
EntityEcaUtil.getEntityEcaCache(this.entityEcaReaderName);
}
public Future<Collection<Future<Content>>> addContent(final Collection<Content> col) {
final FutureTask<Collection<Future<Content>>> fu = new FutureTask<Collection<Future<Content>>>(new Callable<Collection<Future<Content>>>() {
@Override
public Collection<Future<Content>> call() {
Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
try {
return universe.addContentLater(col);
} catch (final Throwable e) {
IJError.print(e);
return null;
}
}});
launchers.submit(new Runnable() { @Override
public void run() {
executors.submit(fu);
}});
return fu;
}
@Override
public Future<?> submit(Runnable task) {
try {
if (this.concurrentExecutor instanceof ExecutorService) {
return ((ExecutorService) this.concurrentExecutor).submit(task);
}
else {
FutureTask<Object> future = new FutureTask<Object>(task, null);
this.concurrentExecutor.execute(future);
return future;
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(
"Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
}
}
/**
* Generates the authentication encryption key in a background thread (if necessary).
*/
private static void triggerMacKeyGeneration() {
synchronized (sLock) {
if (sKey != null || sMacKeyGenerator != null) {
return;
}
sMacKeyGenerator = new FutureTask<SecretKey>(new Callable<SecretKey>() {
// SecureRandomInitializer addresses the bug in SecureRandom that "TrulyRandom"
// warns about, so this lint warning can safely be suppressed.
@SuppressLint("TrulyRandom")
@Override
public SecretKey call() throws Exception {
KeyGenerator generator = KeyGenerator.getInstance(MAC_ALGORITHM_NAME);
SecureRandom random = new SecureRandom();
SecureRandomInitializer.initialize(random);
generator.init(MAC_KEY_BYTE_COUNT * 8, random);
return generator.generateKey();
}
});
AsyncTask.THREAD_POOL_EXECUTOR.execute(sMacKeyGenerator);
}
}
public void testMemoryLimit() throws Exception {
LOG.info("Starting testMemoryLimit");
JobConf conf = new JobConf();
conf.setInt(CoronaConf.NODE_RESERVED_MEMORY_MB, Integer.MAX_VALUE);
corona = new MiniCoronaCluster.Builder().conf(conf).numTaskTrackers(2).build();
final JobConf jobConf = corona.createJobConf();
long start = System.currentTimeMillis();
FutureTask<Boolean> task = submitSleepJobFutureTask(jobConf);
checkTaskNotDone(task, 10);
NodeManager nm = corona.getClusterManager().getNodeManager();
nm.getResourceLimit().setNodeReservedMemoryMB(0);
Assert.assertTrue(task.get());
long end = System.currentTimeMillis();
LOG.info("Task Done. Verifying");
new ClusterManagerMetricsVerifier(corona.getClusterManager(),
1, 1, 1, 1, 1, 1, 0, 0).verifyAll();
LOG.info("Time spent for testMemoryLimit:" +
(end - start));
}
@Test
public void testFuture() throws Exception {
JexlScript e = JEXL.createScript("while(true);");
FutureTask<Object> future = new FutureTask<Object>(e.callable(null));
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(future);
Object t = 42;
try {
t = future.get(100, TimeUnit.MILLISECONDS);
Assert.fail("should have timed out");
} catch (TimeoutException xtimeout) {
// ok, ignore
future.cancel(true);
} finally {
executor.shutdown();
}
Assert.assertTrue(future.isCancelled());
Assert.assertEquals(42, t);
}
public static PsiDirectory writeDirectory(PsiDirectory dir, DirectoryWrapper dirWrapper, Project project) {
if (dir == null) {
//todo print error
return null;
}
RunnableFuture<PsiDirectory> runnableFuture = new FutureTask<>(() ->
ApplicationManager.getApplication().runWriteAction(new Computable<PsiDirectory>() {
@Override
public PsiDirectory compute() {
return writeDirectoryAction(dir, dirWrapper, project);
}
}));
ApplicationManager.getApplication().invokeLater(runnableFuture);
try {
return runnableFuture.get();
} catch (InterruptedException | ExecutionException e) {
Logger.log("runnableFuture " + e.getMessage());
Logger.printStack(e);
}
return null;
}
@Override
public Future<ModelNode> deploy(final ModelNode operation, final ExecutorService executorService) {
try {
deploymentOperation.set(operation);
final FutureTask<ModelNode> task = new FutureTask<ModelNode>(new Callable<ModelNode>() {
@Override
public ModelNode call() throws Exception {
deploymentDoneLatch.await();
return deploymentResults.get();
}
});
executorService.submit(task);
return task;
} finally {
scanDoneLatch.countDown();
}
}
/**
* Blocks until the given condition evaluates to true. The condition is evaluated every @code{frequency}
* milliseconds, so, the given condition should be an idempotent operation.
* If the condition is not met within the given timeout, an exception is thrown.
*
* @param condition the condition to wait for
* @param timeout the timeout value
* @param timeUnit the unit for the timeout
* @param frequency the frequency of the condition's evaluation in milliseconds
*/
public static void until(Callable<Boolean> condition, long timeout, TimeUnit timeUnit, long frequency) {
FutureTask<Void> futureTask = new FutureTask<Void>(() -> {
while (!condition.call()) {
Thread.sleep(frequency);
}
return null;
});
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(futureTask);
try {
futureTask.get(timeout, timeUnit);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
futureTask.cancel(true);
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
private void _test(EchoServer server) throws Exception {
int port = new Random().nextInt(60000) + 1000;
server.start(port);
int threadCount = 100;
Future[] futures = new Future[threadCount];
for (int i = 0; i < threadCount; i++) {
final String msg = "task-" + i;
FutureTask<Boolean> task = new FutureTask<>(() -> {
String result = EchoClient.sendAndReceive(port, msg);
return result.equals(msg);
});
futures[i] = task;
new Thread(task).start();
}
for (Future<Boolean> f : futures) {
Assert.assertTrue(f.get());
}
}
@Override
@NotNull
public FutureTask<Boolean> preprocessFile(@NotNull PsiFile file, boolean processChangedTextOnly) throws IncorrectOperationException {
final FutureTask<Boolean> reformatTask = myReformatCodeProcessor.preprocessFile(file, processChangedTextOnly);
final FutureTask<Boolean> optimizeImportsTask = myOptimizeImportsProcessor.preprocessFile(file, false);
return new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
reformatTask.run();
if (!reformatTask.get() || reformatTask.isCancelled()) {
return false;
}
CodeStyleManagerImpl.setSequentialProcessingAllowed(false);
try {
optimizeImportsTask.run();
return optimizeImportsTask.get() && !optimizeImportsTask.isCancelled();
}
finally {
CodeStyleManagerImpl.setSequentialProcessingAllowed(true);
}
}
});
}
/**
* Warmup the indicated namespaces.
*
* @param namespaces
* A list of zero or more namespaces to be warmed up (optional).
* When <code>null</code> or empty, all namespaces will be warmed
* up.
*
* @return A future for the task that is warming up the indices associated
* with those namespace(s). The future evaluates to a map from the
* name of the index to the statistics collected for that index
* during the warmup procedure.
*
* @see <a href="http://trac.bigdata.com/ticket/1050" > pre-heat the journal
* on startup </a>
*
* @see WarmUpTask
*/
public Future<Map<String, BaseIndexStats>> warmUp(
final List<String> namespaces) {
/*
* The indices will be scanned with one thread per index. This parameter
* determines the #of such scans that will execute in parallel. Since the
* thread will block on any IO, you need a modestly large number of
* threads here to enqueue enough disk reads to drive enough IOPs for an
* efficient disk scan.
*/
final int nparallel = 20;
final FutureTask<Map<String, BaseIndexStats>> ft = new FutureTask<Map<String, BaseIndexStats>>(
new WarmUpTask(this, namespaces, ITx.READ_COMMITTED/* timestamp */,
nparallel, false/* visitLeaves */));
getExecutorService().submit(ft);
return ft;
}
private void executeFoiTasks(Map<String, FutureTask<OperationResult>> getFoiAccessTasks, SOSMetadata metadata) throws InterruptedException,
ExecutionException,
XmlException,
IOException,
OXFException {
int counter;
counter = getFoiAccessTasks.size();
LOGGER.debug("Sending {} GetFeatureOfInterest requests", counter);
for (String procedureID : getFoiAccessTasks.keySet()) {
LOGGER.debug("Sending #{} GetFeatureOfInterest request for procedure '{}'", counter--, procedureID);
FutureTask<OperationResult> futureTask = getFoiAccessTasks.get(procedureID);
AccessorThreadPool.execute(futureTask);
try {
OperationResult opRes = futureTask.get(metadata.getTimeout(), MILLISECONDS);
GetFeatureOfInterestParser getFoiParser = new GetFeatureOfInterestParser(opRes, metadata);
getFoiParser.createFeatures();
}
catch (TimeoutException e) {
LOGGER.error("Timeout occured.", e);
}
}
}
/**
* Try to discover what is hidden under a FutureTask (hack)
*/
public static Object extractUnderlyingCallable(FutureTask<?> futureTask) {
try {
Field syncField = FutureTask.class.getDeclaredField("sync");
syncField.setAccessible(true);
Object sync = syncField.get(futureTask);
Field callableField = sync.getClass().getDeclaredField("callable");
callableField.setAccessible(true);
Object callable = callableField.get(sync);
if (callable.getClass().getSimpleName().equals("RunnableAdapter")) {
Field taskField = callable.getClass().getDeclaredField("task");
taskField.setAccessible(true);
return taskField.get(callable);
}
return callable;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
public void runFuture() {
for (int i = 0; i < 500; i++) {
CompositeDisposable set = new CompositeDisposable();
final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set);
set.add(run);
final FutureTask<Void> ft = new FutureTask<Void>(Functions.EMPTY_RUNNABLE, null);
Runnable r1 = new Runnable() {
@Override
public void run() {
run.call();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
run.setFuture(ft);
}
};
TestCommonHelper.race(r1, r2);
}
}
/**
* Constructs this {@code SwingWorker}.
*/
public SwingWorker() {
Callable<T> callable =
new Callable<T>() {
public T call() throws Exception {
setState(StateValue.STARTED);
return doInBackground();
}
};
future = new FutureTask<T>(callable) {
@Override
protected void done() {
doneEDT();
setState(StateValue.DONE);
}
};
state = StateValue.PENDING;
propertyChangeSupport = new SwingWorkerPropertyChangeSupport(this);
doProcess = null;
doNotifyProgressChange = null;
}
private boolean checkThreadCompletion(final List<FutureTask> taskList)
{
try
{
for (FutureTask futureTask : taskList)
{
futureTask.get();
}
}
catch (Exception e)
{
ISF_LOGGER.error("task execution error: {}", e.toString());
e.printStackTrace();
return false;
}
return true;
}
private FutureTask<Void> getTask(final ClassFile cf) {
FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() {
public Void call() throws Exception {
finder.parse(cf);
return null;
}
});
tasks.add(task);
return task;
}
public Message write(final Message message, final Collection<Chunk> chunks) throws ArangoDBException {
final FutureTask<Message> task = new FutureTask<>(() -> messageStore.get(message.getId()));
messageStore.storeMessage(message.getId(), task);
super.writeIntern(message, chunks);
try {
return timeout == null || timeout == 0L ? task.get() : task.get(timeout, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new ArangoDBException(e);
}
}
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws Exception {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
true,
stateHandles,
cancelStreamRegistry) {
@Override
@SuppressWarnings("unchecked")
public DefaultOperatorStateBackend build() {
return new DefaultOperatorStateBackend(
executionConfig,
cancelStreamRegistry,
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
mock(AbstractSnapshotStrategy.class)
) {
@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
return new FutureTask<>(() -> {
throw new Exception("Async part snapshot exception.");
});
}
};
}
}.build();
}
public void check() throws Exception {
final FutureTask<Exception> futureTask = new FutureTask<>(this);
executor.submit(futureTask);
Exception x = futureTask.get();
if ( x != null) {
throw new RuntimeException("Check failed: "+x,x);
}
}
@Test
public void testAbsolutePathOneAndTwo() throws Exception {
final String requestURI = "http://localhost:" + PORT + "/one/bookstore/request?delay";
Callable<String> callable = new Callable<String>() {
public String call() {
WebClient wc = WebClient.create(requestURI);
return wc.accept("text/plain").get(String.class);
}
};
FutureTask<String> task = new FutureTask<>(callable);
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.execute(task);
Thread.sleep(1000);
Runnable runnable = new Runnable() {
public void run() {
try {
testAbsolutePathTwo();
} catch (Exception ex) {
throw new RuntimeException("Concurrent testAbsolutePathTwo failed");
}
}
};
new Thread(runnable).start();
Thread.sleep(2000);
String path = task.get();
assertEquals("Absolute RequestURI is wrong", requestURI, path);
}
private List<FutureTask<Boolean>> processOperationsInFuture(int count) {
List<FutureTask<Boolean>> futureTaskList = new ArrayList<FutureTask<Boolean>>(count);
this.opCounts = new CopyOnWriteArrayList<Pair<String, Integer>>();
for (int i = 0; i < count; i++) {
Callable<Boolean> callable = new MongoCompositeTest(i, size, chunkSize, da, dataRecord, this.opCounts, this.operationsEnabled, this.profiledCollectionName);
FutureTask<Boolean> futureTask = MongoExecutor.execute(callable);
futureTaskList.add(futureTask);
}
return futureTaskList;
}
@NbBundle.Messages({"LBL_ResolveFXJDK=Choose FX-enabled Java Platform - \"{0}\" Project"})
@Override
public Future<Result> resolve() {
final ChooseOtherPlatformPanel choosePlatform = new ChooseOtherPlatformPanel(type);
final DialogDescriptor dd = new DialogDescriptor(choosePlatform, Bundle.LBL_ResolveFXJDK(ProjectUtils.getInformation(project).getDisplayName()));
if (DialogDisplayer.getDefault().notify(dd) == DialogDescriptor.OK_OPTION) {
final Callable<ProjectProblemsProvider.Result> resultFnc =
new Callable<Result>() {
@Override
public Result call() throws Exception {
final JavaPlatform jp = choosePlatform.getSelectedPlatform();
if(jp != null) {
try {
ProjectManager.mutex().writeAccess(new Mutex.ExceptionAction<Void>() {
@Override
public Void run() throws IOException {
platformSetter.setProjectPlatform(jp);
JFXProjectUtils.updateClassPathExtension(project);
return null;
}
});
} catch (MutexException e) {
throw (IOException) e.getCause();
}
LOGGER.info("Set " + PLATFORM_ACTIVE + " to platform " + jp);
return ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED);
}
return ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.UNRESOLVED);
}
};
final RunnableFuture<Result> result = new FutureTask<Result>(resultFnc);
RP.post(result);
return result;
}
return new JFXProjectProblems.Done(
Result.create(ProjectProblemsProvider.Status.UNRESOLVED));
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
return postResult(doInBackground(mParams));
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occured while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
protected void reportStats(ContainerStats.OperatorStats stats, long windowId)
{
stats.outputPorts = new ArrayList<>();
for (Entry<String, Sink<Object>> e : outputs.entrySet()) {
ContainerStats.OperatorStats.PortStats portStats = new ContainerStats.OperatorStats.PortStats(e.getKey());
portStats.tupleCount = e.getValue().getCount(true) - controlTupleCount;
portStats.endWindowTimestamp = endWindowEmitTime;
stats.outputPorts.add(portStats);
}
controlTupleCount = 0;
long currentCpuTime = tmb.getCurrentThreadCpuTime();
stats.cpuTimeUsed = currentCpuTime - lastSampleCpuTime;
lastSampleCpuTime = currentCpuTime;
if (checkpoint != null) {
stats.checkpoint = checkpoint;
stats.checkpointStats = checkpointStats;
checkpointStats = null;
checkpoint = null;
} else {
Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo> pair = taskQueue.peek();
if (pair != null && pair.getFirst().isDone()) {
taskQueue.poll();
try {
CheckpointWindowInfo checkpointWindowInfo = pair.getSecond();
stats.checkpointStats = pair.getFirst().get();
stats.checkpoint = new Checkpoint(checkpointWindowInfo.windowId, checkpointWindowInfo.applicationWindowCount,
checkpointWindowInfo.checkpointWindowCount);
if (operator instanceof Operator.CheckpointListener) {
((Operator.CheckpointListener)operator).checkpointed(checkpointWindowInfo.windowId);
}
} catch (Exception ex) {
throw Throwables.propagate(ex);
}
}
}
context.report(stats, windowId);
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
return postResult(doInBackground(mParams));
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occured while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
/**
* Submits a value-returning task for execution on the EDT and
* returns a Future representing the pending results of the task.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws NullPointerException if the task is null
*/
public static <V> Future<V> submit(Callable<V> task) {
if (task == null) {
throw new NullPointerException();
}
FutureTask<V> future = new FutureTask<V>(task);
execute(future);
return future;
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
android.os.Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
return postResult(doInBackground(mParams));
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occured while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
@Override
protected Future<Channel<byte[]>> connectByProtocol(SocketAddress remoteAddress, SocketAddress localAddress) throws IOException {
SocketChannel sc = null;
boolean success = false;
try {
sc = newSocketChannel(localAddress);
if (sc.connect(remoteAddress)) {
// return true immediately, as established a local connection,
Future<Channel<byte[]>> future = executorService.submit(new ConnectionCall(sc));
success = true;
LOG.debug("[CRAFT-ATOM-NIO] Established local connection");
return future;
}
success = true;
} finally {
if (!success && sc != null) {
try {
close(sc);
} catch (IOException e) {
LOG.warn("[CRAFT-ATOM-NIO] Close exception", e);
}
}
}
ConnectionCall cc = new ConnectionCall(sc);
FutureTask<Channel<byte[]>> futureTask = new FutureTask<Channel<byte[]>>(cc);
cc.setFutureTask(futureTask);
connectQueue.add(cc);
startup();
selector.wakeup();
return futureTask;
}