下面列出了怎么用java.util.concurrent.RejectedExecutionHandler的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void serviceInit(Configuration conf) throws Exception
{
super.serviceInit(conf);
LOG.debug("Creating plugin dispatch queue with size {}", QUEUE_SIZE);
blockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
blockingQueue.remove();
executor.submit(r);
} catch (NoSuchElementException ex) {
// Ignore no-such element as queue may finish, while this handler is called.
}
}
};
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
blockingQueue, new NameableThreadFactory("PluginExecutorThread", true), rejectionHandler);
}
@Test
@SuppressWarnings("serial")
public void testSettingRejectedExecutionHandlerToNullForcesUseOfDefaultButIsOtherwiseCool() throws Exception {
ScheduledExecutorFactoryBean factory = new ScheduledExecutorFactoryBean() {
@Override
protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
assertNotNull("Bah; the setRejectedExecutionHandler(..) method must use a default RejectedExecutionHandler if a null arg is passed in.");
return super.createExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}
};
factory.setScheduledExecutorTasks(new ScheduledExecutorTask[]{
new NoOpScheduledExecutorTask()
});
factory.setRejectedExecutionHandler(null); // the null must not propagate
factory.afterPropertiesSet();
factory.destroy();
}
/**
* Returns an instance of ThreadPoolExecutor using an bounded queue and blocking when the worker queue is full.
* @param nThreads thread pool size
* @param queueSize workers queue size
* @return thread pool executor
*/
public static ExecutorService newBlockingFixedThreadPoolExecutor(int nThreads, int queueSize) {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(queueSize);
RejectedExecutionHandler blockingRejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(task);
} catch (InterruptedException e) {
}
}
};
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, blockingQueue,
blockingRejectedExecutionHandler);
}
public ExceedCallImmediatelyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, String prefixName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new SynchronousQueue<Runnable>(true),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
TEMPORARY_CACHED_THREAD_POOL.
execute(((RealExecutor) executor).getCommandNameMap().get(r),
r);
}
}
}, prefixName);
}
@Override
public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
if (rejectedExecutionHandler == null) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(
profile.getPoolSize(), managedThreadFactory, rejectedExecutionHandler);
if (profile.getMaxPoolSize() > 0) {
return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
} else {
return answer;
}
}
public static void testThreadPoolExecutor(){
BlockingQueue<Runnable> bq=new ArrayBlockingQueue<Runnable>(10);
//在调用shutdown方法后,抛出RejectedExecutionException的异常
//handler仅影响在执行shutdown后提交任务的响应情况
RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy(); //默认就是使用该机制
//尽量不要自己创建线程工厂,除非有特殊的需求并且非常的了解线程池的工作机制,或者需要自己的管理机制
//如果不传递默认线程工厂参数,则使用Executors.DefaultThreadFactory
//了解Executors.DefaultThreadFactory的实现
ThreadFactory tf=new MyThreadFactory();
//线程池中存在10个线程,最大允许20个线程,比10个多的线程在2秒钟内,没接到任务,就会自动消除
//使用DelayQueue作为任务队列,超出线程范围时,采用拒绝处理的政策
ThreadPoolExecutor tpe=new ThreadPoolExecutor(10, 20, 2, TimeUnit.SECONDS, bq, tf,handler);
// int count=0;
for(int i=0;i<23;i++){
tpe.execute(new RunEntity());
System.out.println("add"+i+"========");
}
tpe.shutdown();
}
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor = createExecutor(this.corePoolSize, this.maxPoolSize,
this.keepAliveSeconds, queue, threadFactory, rejectedExecutionHandler);
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
// Wrap executor with an unconfigurable decorator.
this.exposedExecutor = (this.exposeUnconfigurableExecutor ?
Executors.unconfigurableExecutorService(executor) : executor);
return executor;
}
@Override
public ScheduledExecutorService createExecutor(int poolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
ScheduledExecutorService executorService = (ScheduledExecutorService) ReflectionUtils
.invokeMethod(this.createExecutor, this.delegate, poolSize,
traceThreadFactory(threadFactory), rejectedExecutionHandler);
if (executorService instanceof TraceableScheduledExecutorService) {
return executorService;
}
return new TraceableScheduledExecutorService(this.beanFactory, executorService);
}
@Override
protected ThreadPoolExecutor createExecutor(
int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue,
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
final ThreadPoolExecutor threadPoolExecutor = newThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, queue, threadFactory, rejectedExecutionHandler);
if (preStartAllCoreThreads) {
threadPoolExecutor.prestartAllCoreThreads();
}
return threadPoolExecutor;
}
/**
* A thread pool that that blocks clients submitting additional tasks if
* there are already {@code activeTasks} running threads and {@code
* waitingTasks} tasks waiting in its queue.
*
* @param activeTasks maximum number of active tasks
* @param waitingTasks maximum number of waiting tasks
* @param keepAliveTime time until threads are cleaned up in {@code unit}
* @param unit time unit
* @param prefixName prefix of name for threads
*/
public static BlockingThreadPoolExecutorService newInstance(
int activeTasks,
int waitingTasks,
long keepAliveTime, TimeUnit unit,
String prefixName) {
/* Although we generally only expect up to waitingTasks tasks in the
queue, we need to be able to buffer all tasks in case dequeueing is
slower than enqueueing. */
final BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
ThreadPoolExecutor eventProcessingExecutor =
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
workQueue, newDaemonThreadFactory(prefixName),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
//This is not expected to happen.
LOG.error("Could not submit task to executor {}", executor.toString());
}
});
eventProcessingExecutor.allowCoreThreadTimeOut(true);
return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
eventProcessingExecutor);
}
/**
* Create a new pool
**/
public PooledExecutorWithDMStats(BlockingQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout, RejectedExecutionHandler reh) {
super(getCorePoolSize(maxPoolSize), maxPoolSize,
msTimeout, TimeUnit.MILLISECONDS,
q, tf, reh);
// if (getCorePoolSize() != 0 && getCorePoolSize() == getMaximumPoolSize()) {
// allowCoreThreadTimeOut(true); // deadcoded for 1.5
// }
this.stats = stats;
}
@Test
public void getExecutor1() throws Exception {
URL url = URL.valueOf("dubbo://10.20.130.230:20880/context/path?" +
Constants.THREAD_NAME_KEY + "=demo&" +
Constants.CORE_THREADS_KEY + "=1&" +
Constants.THREADS_KEY + "=2&" +
Constants.ALIVE_KEY + "=1000&" +
Constants.QUEUES_KEY + "=0");
ThreadPool threadPool = new EagerThreadPool();
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(url);
assertThat(executor, instanceOf(EagerThreadPoolExecutor.class));
assertThat(executor.getCorePoolSize(), is(1));
assertThat(executor.getMaximumPoolSize(), is(2));
assertThat(executor.getKeepAliveTime(TimeUnit.MILLISECONDS), is(1000L));
assertThat(executor.getQueue().remainingCapacity(), is(1));
assertThat(executor.getQueue(), Matchers.<BlockingQueue<Runnable>>instanceOf(TaskQueue.class));
assertThat(executor.getRejectedExecutionHandler(),
Matchers.<RejectedExecutionHandler>instanceOf(AbortPolicyWithReport.class));
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
Thread thread = Thread.currentThread();
assertThat(thread, instanceOf(InternalThread.class));
assertThat(thread.getName(), startsWith("demo"));
latch.countDown();
}
});
latch.await();
assertThat(latch.getCount(), is(0L));
}
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
ScheduledExecutorService executor =
createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
if (this.removeOnCancelPolicy) {
if (executor instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true);
}
else {
logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor");
}
}
// Register specified ScheduledExecutorTasks, if necessary.
if (!ObjectUtils.isEmpty(this.scheduledExecutorTasks)) {
registerTasks(this.scheduledExecutorTasks, executor);
}
// Wrap executor with an unconfigurable decorator.
this.exposedExecutor = (this.exposeUnconfigurableExecutor ?
Executors.unconfigurableScheduledExecutorService(executor) : executor);
return executor;
}
public CountedRejectedExecutionHandler(
RejectedExecutionHandler delegate, MeterRegistry registry, String name, Iterable<Tag> tags) {
this.delegate = delegate;
this.counter =
Counter.builder("executor.rejected")
.tags(Tags.concat(tags, "name", name))
.description("The number of tasks not accepted for execution")
.baseUnit(BaseUnits.TASKS)
.register(registry);
}
/**
* 固定大小线程池,自定义队列、线程池工厂和拒绝策略
*
* @param corePoolSize 初始化线程池
* @param queue 线程池队列
* @param threadFactory 线程池工厂
* @param handler 拒绝策略
* @return the thread pool executor
*/
public static ThreadPoolExecutor newFixedThreadPool(int corePoolSize,
BlockingQueue<Runnable> queue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
return new ThreadPoolExecutor(corePoolSize,
corePoolSize,
0,
TimeUnit.MILLISECONDS,
queue,
threadFactory,
handler);
}
/**
* setRejectedExecutionHandler sets the handler returned by
* getRejectedExecutionHandler
*/
public void testSetRejectedExecutionHandler() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
RejectedExecutionHandler handler = new NoOpREHandler();
p.setRejectedExecutionHandler(handler);
assertSame(handler, p.getRejectedExecutionHandler());
}
}
private static ExecutorService newPool(final int coreThreads, final int maxThreads,
final BlockingQueue<Runnable> workQueue, final String name,
final RejectedExecutionHandler handler) {
return ThreadPoolUtil.newBuilder() //
.poolName(name) //
.enableMetric(true) //
.coreThreads(coreThreads) //
.maximumThreads(maxThreads) //
.keepAliveSeconds(60L) //
.workQueue(workQueue) //
.threadFactory(new NamedThreadFactory(name, true)) //
.rejectedHandler(handler) //
.build();
}
public ThreadPoolTaskExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public ProcessingLibraryThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler, ExceptionHandler exceptionHandler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.exceptionHandler = exceptionHandler;
}
public ClingExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedHandler) {
// This is the same as Executors.newCachedThreadPool
super(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory,
rejectedHandler
);
}
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
public MetricFetcher() {
int cores = Runtime.getRuntime().availableProcessors() * 2;
long keepAliveTime = 0;
int queueSize = 2048;
RejectedExecutionHandler handler = new DiscardPolicy();
fetchService = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchService"), handler);
fetchWorker = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchWorker"), handler);
IOReactorConfig ioConfig = IOReactorConfig.custom()
.setConnectTimeout(3000)
.setSoTimeout(3000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2)
.build();
httpclient = HttpAsyncClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000)
.setMaxConnPerRoute(1000)
.setDefaultIOReactorConfig(ioConfig)
.build();
httpclient.start();
start();
}
public SessionThreadPoolExecutor(String executorName, int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.executorName = executorName;
registerTaskMetrics();
this.setRejectedExecutionHandler(handler);
}
/**
* Constructor throws if handler is null
*/
public void testConstructorNullPointerException7() {
try {
new CustomTPE(1, 2, 1L, SECONDS,
new ArrayBlockingQueue<Runnable>(10),
new SimpleThreadFactory(),
(RejectedExecutionHandler) null);
shouldThrow();
} catch (NullPointerException success) {}
}
public MDCThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this.executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
handler);
}
LazyTraceScheduledThreadPoolExecutor(
int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler,
ScheduledThreadPoolExecutor delegate) {
super(corePoolSize, threadFactory, handler);
this.delegate = delegate;
this.decorateTaskRunnable = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class, "decorateTask", Runnable.class,
RunnableScheduledFuture.class);
makeAccessibleIfNotNull(this.decorateTaskRunnable);
this.decorateTaskCallable = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class, "decorateTaskCallable", Callable.class,
RunnableScheduledFuture.class);
makeAccessibleIfNotNull(this.decorateTaskCallable);
this.finalize = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
"finalize", null);
makeAccessibleIfNotNull(this.finalize);
this.beforeExecute = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
"beforeExecute", null);
makeAccessibleIfNotNull(this.beforeExecute);
this.afterExecute = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
"afterExecute", null);
makeAccessibleIfNotNull(this.afterExecute);
this.terminated = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class,
"terminated");
makeAccessibleIfNotNull(this.terminated);
this.newTaskForRunnable = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class, "newTaskFor", Runnable.class,
Object.class);
makeAccessibleIfNotNull(this.newTaskForRunnable);
this.newTaskForCallable = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class, "newTaskFor", Callable.class,
Object.class);
makeAccessibleIfNotNull(this.newTaskForCallable);
}
public SynThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> runnableQueue, ThreadFactory threadFactory, String name)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue<Runnable>) runnableQueue, threadFactory);
RejectedExecutionHandler handler = new DefaultRejectedExecutionHandler(this);
this.setRejectedExecutionHandler(handler);
this.name = name;
}
LazyTraceScheduledThreadPoolExecutor(
int corePoolSize,
RejectedExecutionHandler handler,
ScheduledThreadPoolExecutor delegate) {
super(corePoolSize, handler);
this.delegate = delegate;
this.decorateTaskRunnable = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class, "decorateTask", Runnable.class,
RunnableScheduledFuture.class);
makeAccessibleIfNotNull(this.decorateTaskRunnable);
this.decorateTaskCallable = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class, "decorateTaskCallable", Callable.class,
RunnableScheduledFuture.class);
makeAccessibleIfNotNull(this.decorateTaskCallable);
this.finalize = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
"finalize", null);
makeAccessibleIfNotNull(this.finalize);
this.beforeExecute = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
"beforeExecute", null);
makeAccessibleIfNotNull(this.beforeExecute);
this.afterExecute = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
"afterExecute", null);
makeAccessibleIfNotNull(this.afterExecute);
this.terminated = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
"terminated", null);
makeAccessibleIfNotNull(this.terminated);
this.newTaskForRunnable = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class, "newTaskFor", Runnable.class,
Object.class);
makeAccessibleIfNotNull(this.newTaskForRunnable);
this.newTaskForCallable = ReflectionUtils.findMethod(
ScheduledThreadPoolExecutor.class, "newTaskFor", Callable.class,
Object.class);
makeAccessibleIfNotNull(this.newTaskForCallable);
}
public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime,
TimeUnit unit, int queueCapacity, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(coreThreads, maxThreads, keepAliveTime, unit, new ExecutorQueue(), threadFactory,
handler);
((ExecutorQueue) getQueue()).setStandardThreadExecutor(this);
submittedTasksCount = new AtomicInteger(0);
// 最大并发任务限制: 队列buffer数 + 最大线程数
maxSubmittedTaskCount = queueCapacity + maxThreads;
}
private ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
if (registry != null) {
return newMonitoredExecutorService(corePoolSize, maxPoolSize, keepAliveSeconds, queue, threadFactory, rejectedExecutionHandler);
}
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.MILLISECONDS, queue, threadFactory, rejectedExecutionHandler);
}