下面列出了java.util.concurrent.Executor#execute ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in.
*
* <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
* right before each task is run. The renaming is best effort, if a {@link SecurityManager}
* prevents the renaming then it will be skipped but the tasks will still execute.
*
*
* @param executor The executor to decorate
* @param nameSupplier The source of names for each task
*/
@GwtIncompatible // concurrency
static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) {
checkNotNull(executor);
checkNotNull(nameSupplier);
if (isAppEngine()) {
// AppEngine doesn't support thread renaming, so don't even try
return executor;
}
return new Executor() {
@Override
public void execute(Runnable command) {
executor.execute(Callables.threadRenaming(command, nameSupplier));
}
};
}
/**
* Syncs and saves meta-information of all data structures to page memory.
*
* @throws IgniteCheckedException If failed.
*/
private void syncMetadata(Context ctx) throws IgniteCheckedException {
Executor execSvc = ctx.executor();
boolean needSnapshot = ctx.nextSnapshot() && ctx.needToSnapshot(grp.cacheOrGroupName());
if (needSnapshot) {
if (execSvc == null)
addPartitions(ctx);
else {
execSvc.execute(() -> {
try {
addPartitions(ctx);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
});
}
}
syncMetadata(ctx, ctx.executor(), needSnapshot);
}
@Override
public synchronized void ping(final PingCallback callback, Executor executor) {
if (terminated) {
final Status shutdownStatus = this.shutdownStatus;
executor.execute(new Runnable() {
@Override
public void run() {
callback.onFailure(shutdownStatus.asRuntimeException());
}
});
} else {
executor.execute(new Runnable() {
@Override
public void run() {
callback.onSuccess(0);
}
});
}
}
/**
* Submits the given runnable to the given {@link Executor} catching and logging all
* {@linkplain RuntimeException runtime exceptions} thrown by the executor.
*/
private static void executeListener(Runnable runnable, Executor executor) {
try {
executor.execute(runnable);
} catch (RuntimeException e) {
// Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if
// we're given a bad one. We only catch RuntimeException because we want Errors to propagate
// up.
log.log(
Level.SEVERE,
"RuntimeException while executing runnable " + runnable + " with executor "
+ executor,
e);
}
}
/**
* Cleans up stale blobs directly under the repository root as well as all indices paths that aren't referenced by any existing
* snapshots. This method is only to be called directly after a new {@link RepositoryData} was written to the repository and with
* parameters {@code foundIndices}, {@code rootBlobs}
*
* @param foundIndices all indices blob containers found in the repository before {@code newRepoData} was written
* @param rootBlobs all blobs found directly under the repository root
* @param newRepoData new repository data that was just written
* @param listener listener to invoke with the combined long of all blobs removed in this operation
*/
private void cleanupStaleBlobs(Map<String, BlobContainer> foundIndices, Map<String, BlobMetaData> rootBlobs,
RepositoryData newRepoData, ActionListener<Long> listener) {
final GroupedActionListener<Long> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
long deletes = 0;
for (Long result : deleteResults) {
deletes += result;
}
listener.onResponse(deletes);
}, listener::onFailure), 2);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
executor.execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs = cleanupStaleRootFiles(staleRootBlobs(newRepoData, rootBlobs.keySet()));
return (long) deletedBlobs.size();
}));
final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
}
public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) {
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(latch::countDown);
try {
return latch.await(timeout, unit);
} catch (Exception e) {
return false;
}
}
/**
* Asynchronously finds the {@link Profile} violations in given source and classpath roots.
* @param profileToCheck the {@link Profile} to be verified
* @param bootClassPath the boot classpath of JDK 8 platform to get the profile info from
* @param compileClassPath the compile classpath to be validated
* @param sourcePath the source path to be validated
* @param check types of validation
* @param collectorFactory the {@link Violation}s collector
* @param executor to use for the asynchronous operation, may have higher throughput
* @throws IllegalArgumentException if the bootClassPath is not a valid JDK 8 boot classpath
*/
public static void findProfileViolations(
@NonNull final Profile profileToCheck,
@NonNull final Iterable<URL> bootClassPath,
@NonNull final Iterable<URL> compileClassPath,
@NonNull final Iterable<URL> sourcePath,
@NonNull final Set<Validation> check,
@NonNull final ViolationCollectorFactory collectorFactory,
@NonNull final Executor executor) {
Parameters.notNull("profileToCheck", profileToCheck); //NOI18N
Parameters.notNull("compileClassPath", compileClassPath); //NOI18N
Parameters.notNull("sourcePath", sourcePath); //NOI18N
Parameters.notNull("check", check); //NOI18N
Parameters.notNull("collectorFactory", collectorFactory); //NOI18N
Parameters.notNull("executor", executor); //NOI18N
final Context ctx = new Context(profileToCheck, bootClassPath, collectorFactory, check);
if (check.contains(Validation.BINARIES_BY_MANIFEST) ||
check.contains(Validation.BINARIES_BY_CLASS_FILES)) {
for (final URL compileRoot : compileClassPath) {
executor.execute(Validator.forBinary(compileRoot, ctx));
}
}
if (check.contains(Validation.SOURCES)) {
for (final URL sourceRoot : sourcePath) {
executor.execute(Validator.forSource(sourceRoot, ctx));
}
}
}
private static BackgroundCallback wrapCallback(final CuratorFrameworkImpl client, final BackgroundCallback callback, final Executor executor)
{
return new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework dummy, final CuratorEvent event) throws Exception
{
executor.execute
(
new Runnable()
{
@Override
public void run()
{
try
{
callback.processResult(client, event);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
if ( e instanceof KeeperException )
{
client.validateConnection(client.codeToState(((KeeperException)e).code()));
}
client.logError("Background operation result handling threw exception", e);
}
}
}
);
}
};
}
@Override
public void ping(final PingCallback callback, Executor executor) {
executor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(error.asException());
}
});
}
/** Test the maximum number of threads cannot be exceeded. */
@Test public void testMaxThreads() throws Exception {
int clientThreads = MAX_THREADS * 10;
Executor executor = Executors.newFixedThreadPool(clientThreads);
// Run many clients to make server reach its maximum number of threads
final CountDownLatch ready = new CountDownLatch(clientThreads);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < clientThreads; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
ready.countDown();
try {
start.await();
assertEquals("a:b\nc:d\n",
readOutput(new URL(baseUrl, "/echo?a=b&c=d")));
int serverThreads = server.webServer.getThreadPool().getThreads();
assertTrue("More threads are started than expected, Server Threads count: "
+ serverThreads, serverThreads <= MAX_THREADS);
System.out.println("Number of threads = " + serverThreads +
" which is less or equal than the max = " + MAX_THREADS);
} catch (Exception e) {
// do nothing
}
}
});
}
// Start the client threads when they are all ready
ready.await();
start.countDown();
}
@Test
public void testExecutorCaptures() throws InterruptedException {
// Setup
ExecutorService e = Executors.newCachedThreadPool();
Executor f = StateCapture.capturingDecorator(e);
CapturedState mockCapturedState = mock(CapturedState.class);
Runnable mockRunnable = mock(Runnable.class);
ThreadLocalStateCaptor.THREAD_LOCAL.set(mockCapturedState);
f.execute(mockRunnable);
e.shutdown();
e.awaitTermination(10, TimeUnit.HOURS);
verifyStandardCaptures(mockCapturedState, mockRunnable);
}
@Override
public ListenableFuture<T> resultCallback(Consumer<? super T> callback, Executor executor,
ListenerOptimizationStrategy optimize) {
if (invokeCompletedDirectly(executor, optimize)) {
callback.accept(result);
} else {
executor.execute(() -> callback.accept(result));
}
return this;
}
/**
* Tests that idle task managers time out after the configured timeout. A timed out task manager
* will be removed from the slot manager and the resource manager will be notified about the
* timeout, if it can be released.
*/
@Test
public void testTaskManagerTimeout() throws Exception {
final long tmTimeout = 10L;
final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
.build();
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceID resourceID = ResourceID.generate();
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotID slotId = new SlotID(resourceID, 0);
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
.setTaskManagerTimeout(Time.milliseconds(tmTimeout))
.build()) {
slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
}
}
/**
* @param jobWorker Job worker.
* @return {@code True} if job has been submitted to pool.
*/
private boolean executeAsync(GridJobWorker jobWorker) {
try {
if (jobWorker.executorName() != null) {
Executor customExec = ctx.pools().customExecutor(jobWorker.executorName());
if (customExec != null)
customExec.execute(jobWorker);
else {
LT.warn(log, "Custom executor doesn't exist (local job will be processed in default " +
"thread pool): " + jobWorker.executorName());
ctx.getExecutorService().execute(jobWorker);
}
}
else
ctx.getExecutorService().execute(jobWorker);
if (metricsUpdateFreq > -1L)
startedJobsCnt.increment();
startedJobsMetric.increment();
return true;
}
catch (RejectedExecutionException e) {
// Remove from active jobs.
removeFromActive(jobWorker);
// Even if job was removed from another thread, we need to reject it
// here since job has never been executed.
IgniteException e2 = new ComputeExecutionRejectedException("Job has been rejected " +
"[jobSes=" + jobWorker.getSession() + ", job=" + jobWorker.getJob() + ']', e);
if (metricsUpdateFreq > -1L)
rejectedJobsCnt.increment();
rejectedJobsMetric.increment();
jobWorker.finishJob(null, e2, true);
}
return false;
}
public static boolean doHandshake(final SocketChannel socketChannel, final SSLEngine sslEngine) throws IOException {
if (socketChannel == null || sslEngine == null) {
return false;
}
final int appBufferSize = sslEngine.getSession().getApplicationBufferSize();
final int netBufferSize = sslEngine.getSession().getPacketBufferSize();
ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
ByteBuffer myNetData = ByteBuffer.allocate(netBufferSize);
ByteBuffer peerNetData = ByteBuffer.allocate(netBufferSize);
final Executor executor = Executors.newSingleThreadExecutor();
final long startTimeMills = System.currentTimeMillis();
HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
&& handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
final long timeTaken = System.currentTimeMillis() - startTimeMills;
if (timeTaken > 30000L) {
s_logger.warn("SSL Handshake has taken more than 30s to connect to: " + socketChannel.getRemoteAddress() +
". Please investigate this connection.");
return false;
}
switch (handshakeStatus) {
case NEED_UNWRAP:
final HandshakeHolder unwrapResult = doHandshakeUnwrap(socketChannel, sslEngine, peerAppData, peerNetData, appBufferSize);
peerAppData = unwrapResult.getAppDataBuffer();
peerNetData = unwrapResult.getNetDataBuffer();
if (!unwrapResult.isSuccess()) {
return false;
}
break;
case NEED_WRAP:
final HandshakeHolder wrapResult = doHandshakeWrap(socketChannel, sslEngine, myAppData, myNetData, peerNetData, netBufferSize);
myNetData = wrapResult.getNetDataBuffer();
if (!wrapResult.isSuccess()) {
return false;
}
break;
case NEED_TASK:
Runnable task;
while ((task = sslEngine.getDelegatedTask()) != null) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("SSL: Running delegated task!");
}
executor.execute(task);
}
break;
case FINISHED:
break;
case NOT_HANDSHAKING:
break;
default:
throw new IllegalStateException("Invalid SSL status: " + handshakeStatus);
}
handshakeStatus = sslEngine.getHandshakeStatus();
}
return true;
}
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}
/**
* Update an unprocessed event in the unprocessed events map. This method is
* called by a primary <code>Gateway</code> (through
* {@link com.gemstone.gemfire.internal.cache.GatewayImpl.SecondaryGatewayListener#afterCreate}
* ) to notify the secondary <code>Gateway</code> that an event has been added
* to the queue. Once an event has been added to the queue, the secondary no
* longer needs to keep track of it in the unprocessed events map. The
* complexity of this method is the fact that the event could be processed
* first by either the primary or secondary <code>Gateway</code>.
*
* If the primary processes the event first, the map will not contain an entry
* for the event. It will be added to the map in this case so that when the
* secondary processes it, it will know that the primary has already processed
* it, and it can be safely removed.
*
* If the secondary processes the event first, the map will already contain an
* entry for the event. In this case, the event can be removed from the map.
*
* @param gatewayEvent
* The event being processed
*/
protected void handlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) {
Executor my_executor = this.executor;
synchronized (listenerObjectLock) {
if (my_executor == null) {
// should mean we are now primary
return;
}
try {
my_executor.execute(new Runnable() {
public void run() {
basicHandlePrimaryEvent(gatewayEvent);
}
});
}
catch (RejectedExecutionException ex) {
throw ex;
}
}
}
default Executor embedVersion(Executor delegate)
{
requireNonNull(delegate, "delegate is null");
return runnable -> delegate.execute(embedVersion(runnable));
}
/**
* Completes this CompletableFuture with the result of
* the given Supplier function invoked from an asynchronous
* task using the given executor.
*
* @param supplier a function returning the value to be used
* to complete this CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return this CompletableFuture
* @since 9
*/
public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,
Executor executor) {
if (supplier == null || executor == null)
throw new NullPointerException();
executor.execute(new AsyncSupply<T>(this, supplier));
return this;
}
/**
* Returns a new {@link Promise} that is asynchronously resolved by a task running in the given executor
* with the value obtained by calling the given {@link Callable}.
* @param <U>
* the function's return type
* @param call
* a function returning the value to be used to resolve the returned {@link Promise}
* @param executor
* the executor to use for asynchronous execution
* @return
* the new {@link Promise}
*/
public static <U> Promise<U> submit(Callable<U> call, Executor executor) {
CompletableTask<U> result = new CompletableTask<>(executor, call);
executor.execute(result);
return result;
}