下面列出了com.google.common.util.concurrent.MoreExecutors#getExitingExecutorService ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private AsyncSqsClient createClient(@NonNull final AsyncSqsClientConfiguration configuration) {
if (configuration.getType().equals(AsyncSqsClientConfiguration.Type.MOCK)) {
return new MockAsyncSqsClient();
} else {
SqsAsyncClient asyncSqsClient = getAsyncSQSClient(configuration);
return new AsyncSqsClientImpl(
asyncSqsClient,
metrics,
MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(
configuration.getConsumerConcurrency(),
configuration.getConsumerConcurrency(),
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("conveyor-executor-%d").build())),
configuration.getMaxUrlCacheSize(),
configuration.getReceiveWaitSeconds(),
configuration.getBulkheadMaxWaitMillis(),
configuration.getConsumerConcurrency());
}
}
public FileDownloader(BookKeeper bookKeeper, MetricRegistry metrics, Configuration conf, RemoteFetchProcessor remoteFetchProcessor)
{
this.bookKeeper = bookKeeper;
this.remoteFetchProcessor = remoteFetchProcessor;
this.conf = conf;
this.metrics = metrics;
int numThreads = CacheConfig.getRemoteFetchThreads(conf);
this.diskReadBufferSize = CacheConfig.getDiskReadBufferSize(conf);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder()
.setNameFormat("parallel-warmup-%s")
.setDaemon(true)
.build());
processService = MoreExecutors.getExitingExecutorService(executor);
initializeMetrics();
}
public ExecutorService getMigrationlExecutor() {
//63337
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(maxThreads,
maxThreads,
120L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxThreads),
XpipeThreadFactory.create("test-thread"),
new ThreadPoolExecutor.CallerRunsPolicy());
//109499
// ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4,
// maxThreads,
// 120L, TimeUnit.SECONDS,
// new SynchronousQueue<>(),
// XpipeThreadFactory.create("test-thread"),
// new ThreadPoolExecutor.CallerRunsPolicy());
threadPool.allowCoreThreadTimeOut(true);
return MoreExecutors.getExitingExecutorService(
threadPool,
AbstractSpringConfigContext.THREAD_POOL_TIME_OUT, TimeUnit.SECONDS);
}
public ExecutorService getMigrationlExecutor() {
//63337
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(maxThreads,
maxThreads,
120L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(128),
XpipeThreadFactory.create(MIGRATION_EXECUTOR),
new ThreadPoolExecutor.CallerRunsPolicy());
//109499
// ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4,
// maxThreads,
// 120L, TimeUnit.SECONDS,
// new SynchronousQueue<>(),
// XpipeThreadFactory.create(MIGRATION_EXECUTOR),
// new ThreadPoolExecutor.CallerRunsPolicy());
threadPool.allowCoreThreadTimeOut(true);
return MoreExecutors.getExitingExecutorService(
threadPool,
AbstractSpringConfigContext.THREAD_POOL_TIME_OUT, TimeUnit.SECONDS);
}
public EventReporter(Builder builder) {
super(builder.context, builder.name, builder.filter, builder.rateUnit, builder.durationUnit);
this.closer = Closer.create();
this.immediateReportExecutor = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1,
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))),
5, TimeUnit.MINUTES);
this.metricContext = builder.context;
this.notificationTargetKey = builder.context.addNotificationTarget(new Function<Notification, Void>() {
@Nullable
@Override
public Void apply(Notification notification) {
notificationCallback(notification);
return null;
}
});
this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY);
}
@Provides
@Singleton
public ManagedInboundJobQueueConsumer providesManagedInboundJobQueueConsumer(
@Named(Constants.SQS_CONSUMER) AsyncSqsClient asyncClient, ScheduleManager scheduleManager) {
return new ManagedInboundJobQueueConsumer(
asyncClient,
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor)
Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setNameFormat("inbound-consumer-%d").build())),
scheduleManager,
sqsConfiguration.getInboundQueueName());
}
@Bean(name = MIGRATION_EXECUTOR)
public ExecutorService getMigrationlExecutor() {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(maxThreads,
maxThreads,
120L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxThreads/2),
XpipeThreadFactory.create(MIGRATION_EXECUTOR),
new ThreadPoolExecutor.CallerRunsPolicy());
poolExecutor.allowCoreThreadTimeOut(true);
return MoreExecutors.getExitingExecutorService(
poolExecutor,
AbstractSpringConfigContext.THREAD_POOL_TIME_OUT, TimeUnit.SECONDS);
}
/**
* A utility method to parallelize loops. Applies the {@link Function} to every element in the {@link List} in
* parallel by spawning threads. A list containing the result obtained by applying the function is returned. The
* method is a blocking call and will wait for all the elements in the list to be processed or timeoutInSecs which
* ever is earlier.
* <p>
* <b>NOTE: The method is an all or none implementation. Meaning, if any of the thread fails, the method will throw an
* {@link ExecutionException} even if other threads completed successfully</b>
* </p>
*
* <ul>
* <li>Uses a Fixed thread pool of size threadCount.
* <li>Uses {@link #shutdownExecutorService(ExecutorService, Optional, long, TimeUnit)} to shutdown the executor
* service
* <li>All threads are daemon threads
* </ul>
*
* @param list input list on which the function is applied in parallel
* @param function to be applied on every element of the list
* @param threadCount to be used to process the list
* @param timeoutInSecs to wait for all the threads to complete
* @param logger an {@link Optional} wrapping a {@link Logger} to be used during shutdown
*
* @return a list containing the result obtained by applying the function on each element of the input list in the
* same order
*
* @throws IllegalArgumentException if input list or function is null
* @throws ExecutionException <ul>
* <li>if any computation threw an exception
* <li>if any computation was cancelled
* <li>if any thread was interrupted while waiting
* <ul>
*/
public static <F, T> List<T> parallelize(final List<F> list, final Function<F, T> function, int threadCount,
int timeoutInSecs, Optional<Logger> logger) throws ExecutionException {
Preconditions.checkArgument(list != null, "Input list can not be null");
Preconditions.checkArgument(function != null, "Function can not be null");
final List<T> results = Lists.newArrayListWithCapacity(list.size());
List<Future<T>> futures = Lists.newArrayListWithCapacity(list.size());
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(threadCount, ExecutorsUtils.newThreadFactory(logger)), 2,
TimeUnit.MINUTES);
for (final F l : list) {
futures.add(executorService.submit(new Callable<T>() {
@Override
public T call() throws Exception {
return function.apply(l);
}
}));
}
ExecutorsUtils.shutdownExecutorService(executorService, logger, timeoutInSecs, TimeUnit.SECONDS);
for (Future<T> future : futures) {
try {
results.add(future.get());
} catch (InterruptedException e) {
throw new ExecutionException("Thread interrupted", e);
}
}
return results;
}
@BeforeClass
public static void setUpExecutor() {
// we need only two threads to schedule reading from output and error streams
executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2), 1, TimeUnit.SECONDS);
}
public static void main(String... args) {
final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
final ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS);
executorService.submit((Runnable) () -> {
while (true) {
}
});
}