下面列出了java.util.concurrent.ThreadPoolExecutor#setRejectedExecutionHandler ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected void initAsyncJobExecutionThreadPool() {
if (threadFactory == null) {
log.warn("A managed thread factory was not found, falling back to self-managed threads");
super.initAsyncJobExecutionThreadPool();
} else {
if (threadPoolQueue == null) {
log.info("Creating thread pool queue of size {}", queueSize);
threadPoolQueue = new ArrayBlockingQueue<Runnable>(queueSize);
}
if (executorService == null) {
log.info("Creating executor service with corePoolSize {}, maxPoolSize {} and keepAliveTime {}", corePoolSize, maxPoolSize, keepAliveTime);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, threadPoolQueue, threadFactory);
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executorService = threadPoolExecutor;
}
startJobAcquisitionThread();
}
}
private Executor createExecutor() {
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10,
60L, SECONDS,
new LinkedBlockingQueue<>(2), new DaemonThreadFactory("org.apache.openejb.util.Pool", hashCode()));
threadPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) {
if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) {
return;
}
try {
if (!tpe.getQueue().offer(r, 20, SECONDS)) {
org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources")
.warning("Default pool executor failed to run asynchronous process: " + r);
}
} catch (final InterruptedException e) {
//Ignore
}
}
});
return threadPoolExecutor;
}
@Override
protected void initAsyncJobExecutionThreadPool() {
if (threadFactory == null) {
LOGGER.warn("A managed thread factory was not found, falling back to self-managed threads");
super.initAsyncJobExecutionThreadPool();
} else {
if (threadPoolQueue == null) {
LOGGER.info("Creating thread pool queue of size {}", queueSize);
threadPoolQueue = new ArrayBlockingQueue<>(queueSize);
}
if (executorService == null) {
LOGGER.info("Creating executor service with corePoolSize {}, maxPoolSize {} and keepAliveTime {}", corePoolSize, maxPoolSize, keepAliveTime);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, threadPoolQueue, threadFactory);
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executorService = threadPoolExecutor;
}
startJobAcquisitionThread();
}
}
@Override
protected void doStart() {
// A single thread executor for all events
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Threads.createDaemonThreadFactory("zk-client-EventThread"));
// Just discard the execution if the executor is closed
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
eventExecutor = executor;
try {
zooKeeper.set(createZooKeeper());
} catch (IOException e) {
notifyFailed(e);
}
}
protected ThreadPoolExecutor initThreadPool(AbstractBulkRequest request) {
int taskThreadNum = request.getTaskThreadNum();
int workQueenLength = request.getTaskQueueNum();
ThreadPoolExecutor executor = new ThreadPoolExecutor(taskThreadNum, taskThreadNum, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(workQueenLength));
executor.setRejectedExecutionHandler(new BlockRejectedExecutionHandler());
return executor;
}
public GraphiteHandler(SampleRepository repository, GraphiteInitializer parent) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
int concurrency = Runtime.getRuntime().availableProcessors();
m_executor = new ThreadPoolExecutor(concurrency, concurrency, 0L, MILLISECONDS, queue);
m_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
m_repository = repository;
m_parent = parent;
m_lines = Lists.newArrayList();
LOG.debug("Using storage concurrency of {}", concurrency);
}
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
threadPool.setThreadFactory(new NamedThreadFactory("SEV-" + serverConfig.getProtocol().toUpperCase()
+ "-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
if (serverConfig.isPreStartCore()) { // 初始化核心线程池
threadPool.prestartAllCoreThreads();
}
return threadPool;
}
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
threadPool.setThreadFactory(new NamedThreadFactory(
"SEV-TRIPLE-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
if (serverConfig.isPreStartCore()) { // 初始化核心线程池
threadPool.prestartAllCoreThreads();
}
return threadPool;
}
private ReadaheadPool() {
pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
pool.setThreadFactory(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Readahead Thread #%d")
.build());
}
/**
* TPE with a rejected execution handler specified.
*/
private ThreadPoolExecutor newNamedThreadPool(final int threads, final String poolName, final Endpoint address) {
final ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
newNamedThreadFactory(poolName, address));
tpe.setRejectedExecutionHandler(new BackgroundExecutorRejectionHandler());
return tpe;
}
/**
* @return the executor for UI background processes.
*/
@Bean(name = "uiExecutor")
@ConditionalOnMissingBean(name = "uiExecutor")
public Executor uiExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(20);
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS,
blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build());
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return new DelegatingSecurityContextExecutor(threadPoolExecutor);
}
/**
* setRejectedExecutionHandler(null) throws NPE
*/
public void testSetRejectedExecutionHandlerNull() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
try {
p.setRejectedExecutionHandler(null);
shouldThrow();
} catch (NullPointerException success) {}
}
}
private ReadaheadPool() {
pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
pool.setThreadFactory(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Readahead Thread #%d")
.build());
}
private ReadaheadPool() {
pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
pool.setThreadFactory(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Readahead Thread #%d")
.build());
}
public AredisAsyncService(String redisServerAddress, int minConcurrent, int maxConcurrent, int queueSize,
String password) {
if (maxConcurrent <= 0) {
maxConcurrent = 50;
}
if (minConcurrent <= 0) {
minConcurrent = 10;
}
if (queueSize <= 0) {
queueSize = 100;
}
this.redisServerAddress = redisServerAddress;
if (password != null) {
AsyncRedisFactory.setAuth(redisServerAddress, password);
}
/**
* 初始化线程池
*/
executor = new ThreadPoolExecutor(minConcurrent, maxConcurrent, 15, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
executor.allowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
if (logger.isTraceEnable()) {
logger.info(this,
"AredisAsyncService线程池设置:min=" + minConcurrent + ",max=" + maxConcurrent + ",queue=" + queueSize);
}
factory = new AsyncRedisFactory(executor);
}
@Test
public void countRejections() {
// given
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
executor.setRejectedExecutionHandler((r, executor1) -> {});
CountedRejectedExecutionHandler.monitorRejections(
registry, executor, CountedRejectedExecutionHandler.class.getName(), userTags);
CountDownLatch runnableTaskComplete = new CountDownLatch(1);
Runnable stub =
() -> {
try {
runnableTaskComplete.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new IllegalStateException("runnable interrupted before completion");
}
};
executor.submit(stub);
// then
for (int i = 0; i < 14; i++) {
executor.submit(
() -> {
// do nothing. Task has to be rejected.
});
}
// when
assertEquals(
registry
.get("executor.rejected")
.tags(userTags)
.tag("name", CountedRejectedExecutionHandler.class.getName())
.counter()
.count(),
14.0);
// cleanup
runnableTaskComplete.countDown();
executor.shutdownNow();
}
public ManagedThreadPool(ThreadPoolExecutor pool) {
this.pool = pool;
pool.setRejectedExecutionHandler(
new CountingRejectionHandler(pool.getRejectedExecutionHandler()));
}
/**
* Creates a pool based on the configuration info.
* <p>
* @param config the pool configuration
* @param threadNamePrefix prefix for the thread names of the pool
* @param threadPriority the priority of the created threads
* @return A ThreadPool wrapper
*/
public ExecutorService createPool( PoolConfiguration config, String threadNamePrefix, int threadPriority )
{
BlockingQueue<Runnable> queue = null;
if ( config.isUseBoundary() )
{
log.debug( "Creating a Bounded Buffer to use for the pool" );
queue = new LinkedBlockingQueue<>(config.getBoundarySize());
}
else
{
log.debug( "Creating a non bounded Linked Queue to use for the pool" );
queue = new LinkedBlockingQueue<>();
}
ThreadPoolExecutor pool = new ThreadPoolExecutor(
config.getStartUpSize(),
config.getMaximumPoolSize(),
config.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
queue,
new DaemonThreadFactory(threadNamePrefix, threadPriority));
// when blocked policy
switch (config.getWhenBlockedPolicy())
{
case ABORT:
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
break;
case RUN:
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
break;
case WAIT:
throw new RuntimeException("POLICY_WAIT no longer supported");
case DISCARDOLDEST:
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
break;
default:
break;
}
pool.prestartAllCoreThreads();
return pool;
}
/**
* @param subscriptionTimestamp timestamp of when the index subscription became active (or more accurately, not
* inactive)
* @param listener listeners that will process the events
* @param threadCnt number of worker threads that will handle incoming SEP events
* @param hostName hostname to bind to
* @param payloadExtractor extracts payloads to include in SepEvents
*/
public SepConsumer(String subscriptionId, long subscriptionTimestamp, EventListener listener, int threadCnt,
String hostName, ZooKeeperItf zk, Configuration hbaseConf, PayloadExtractor payloadExtractor) throws IOException, InterruptedException {
Preconditions.checkArgument(threadCnt > 0, "Thread count must be > 0");
this.subscriptionId = SepModelImpl.toInternalSubscriptionName(subscriptionId);
this.subscriptionTimestamp = subscriptionTimestamp;
this.listener = listener;
this.zk = zk;
this.hbaseConf = hbaseConf;
this.sepMetrics = new SepMetrics(subscriptionId);
this.payloadExtractor = payloadExtractor;
this.executors = Lists.newArrayListWithCapacity(threadCnt);
InetSocketAddress initialIsa = new InetSocketAddress(hostName, 0);
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
String name = "regionserver/" + initialIsa.toString();
this.rpcServer = new RpcServer(this, name, getServices(),
/*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
initialIsa, // BindAddress is IP we got for this server.
//hbaseConf.getInt("hbase.regionserver.handler.count", 10),
//hbaseConf.getInt("hbase.regionserver.metahandler.count", 10),
hbaseConf,
new FifoRpcScheduler(hbaseConf, hbaseConf.getInt("hbase.regionserver.handler.count", 10)));
/*
new SimpleRpcScheduler(
hbaseConf,
hbaseConf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
hbaseConf.getInt("hbase.regionserver.metahandler.count", 10),
hbaseConf.getInt("hbase.regionserver.handler.count", 10),
this,
HConstants.QOS_THRESHOLD)
);
*/
this.serverName = ServerName.valueOf(hostName, rpcServer.getListenerAddress().getPort(), System.currentTimeMillis());
this.zkWatcher = new ZooKeeperWatcher(hbaseConf, this.serverName.toString(), null);
// login the zookeeper client principal (if using security)
ZKUtil.loginClient(hbaseConf, "hbase.zookeeper.client.keytab.file",
"hbase.zookeeper.client.kerberos.principal", hostName);
// login the server principal (if using secure Hadoop)
User.login(hbaseConf, "hbase.regionserver.keytab.file",
"hbase.regionserver.kerberos.principal", hostName);
for (int i = 0; i < threadCnt; i++) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100));
executor.setRejectedExecutionHandler(new WaitPolicy());
executors.add(executor);
}
}
/**
* Creates and configures the update service for this game sync executor.
* The returned executor is <b>unconfigurable</b> meaning it's configuration
* can no longer be modified.
*
* @param nThreads
* the amount of threads to create this service.
* @return the newly created and configured service.
*/
private ExecutorService create(int nThreads) {
if (nThreads <= 1)
return null;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("GameSyncThread").build());
return Executors.unconfigurableExecutorService(executor);
}