下面列出了java.util.concurrent.ThreadPoolExecutor#getQueue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor e) {
LOG.error("Thread pool [{}] is exhausted! {}.", threadPoolName, e.toString());
dumpJvmInfoIfNeeded();
if (!e.isShutdown()) {
final BlockingQueue<Runnable> queue = e.getQueue();
int discardSize = queue.size() >> 1;
discardSize = discardSize <= 0 ? 1 : discardSize;
for (int i = 0; i < discardSize; i++) {
queue.poll();
}
e.execute(r);
}
}
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor e) {
LOG.error("Thread pool [{}] is exhausted! {}.", threadPoolName, e.toString());
dumpJvmInfoIfNeeded();
if (r instanceof RejectedRunnable) {
((RejectedRunnable) r).rejected(); // user-defined rejecting
} else {
if (!e.isShutdown()) {
final BlockingQueue<Runnable> queue = e.getQueue();
final int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
queue.poll();
}
try {
queue.put(r);
} catch (final InterruptedException ignored) {
// should not be interrupted
}
}
}
}
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
BlockingQueue<Runnable> queue = executor.getQueue();
while (true) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("ThreadPoolExecutor has shut down");
}
try {
if (queue.offer(task, 300, TimeUnit.MILLISECONDS)) {
break;
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
// 舍弃1/2队列元素,例如7个单位的元素,舍弃3个
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
// 从头部移除并返问队列头部的元素
queue.poll();
}
// 添加元素,如果队列满,不阻塞,返回false
queue.offer(runnable);
}
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("txTransaction Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (runnable instanceof RejectedRunnable) {
((RejectedRunnable) runnable).rejected();
} else {
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
queue.poll();
}
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void execute() throws Exception {
ExecutorService[] executorServices = Config.resource_node_processPlatformExecutors();
List<String> list = new ArrayList<>();
for (int i = 0; i < executorServices.length; i++) {
ExecutorService service = executorServices[i];
ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
BlockingQueue<Runnable> queue = executor.getQueue();
list.add(String.format("processPlatform executorServices[%d] completed:%d, block:%d.", i,
executor.getCompletedTaskCount(), queue.size()));
if (!queue.isEmpty()) {
List<String> os = new ArrayList<>();
for (Runnable o : queue) {
os.add(o.getClass().toString());
}
list.add(" +++ blocking: " + StringUtils.join(os, ",") + ".");
}
}
System.out.println(StringUtils.join(list, StringUtils.LF));
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if (!(queue instanceof SizeBlockingQueue)) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}
public void init(Context context, int threadCount, OkHttpClient client) {
setupDatabase(context);
recoveryTaskState();
mClient = client;
mThreadCount = threadCount < 1 ? 1 : threadCount <= EasyConstants.MAX_THREAD_COUNT ? threadCount : EasyConstants.MAX_THREAD_COUNT;
mExecutor = new ThreadPoolExecutor(
mThreadCount,
mThreadCount,
20,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
mCurrentTaskList = new HashMap<>();
mQueue = (LinkedBlockingQueue<Runnable>) mExecutor.getQueue();
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
logger.error("Thread pool [{}] is exhausted! {}.", threadPoolName, e.toString());
dumpJvmInfoIfNeeded();
if (r instanceof RejectedRunnable) {
((RejectedRunnable) r).rejected(); // 交给用户来处理
} else {
if (!e.isShutdown()) {
BlockingQueue<Runnable> queue = e.getQueue();
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
queue.poll();
}
try {
queue.put(r);
} catch (InterruptedException ignored) { /* should not be interrupted */ }
}
}
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if (!(queue instanceof SizeBlockingQueue)) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
// 舍弃1/2队列元素,例如7个单位的元素,舍弃3个
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
// 从头部移除并返问队列头部的元素
queue.poll();
}
// 添加元素,如果队列满,不阻塞,返回false
queue.offer(runnable);
}
}
/**
* Closes this appender instance. Before exiting, the implementation tries to
* flush out buffered log events within configured shutdownTimeout seconds. If
* that doesn't finish within configured shutdownTimeout, it would drop all
* the buffered log events.
*/
@Override
public void close() {
ThreadPoolExecutor threadpool = (ThreadPoolExecutor) kinesisClient.getExecutorService();
threadpool.shutdown();
BlockingQueue<Runnable> taskQueue = threadpool.getQueue();
int bufferSizeBeforeShutdown = threadpool.getQueue().size();
boolean gracefulShutdown = true;
try {
gracefulShutdown = threadpool.awaitTermination(shutdownTimeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// we are anyways cleaning up
} finally {
int bufferSizeAfterShutdown = taskQueue.size();
if (!gracefulShutdown || bufferSizeAfterShutdown > 0) {
String errorMsg = "Kinesis Log4J Appender (" + name + ") waited for " + shutdownTimeout
+ " seconds before terminating but could send only " + (bufferSizeAfterShutdown - bufferSizeBeforeShutdown)
+ " logevents, it failed to send " + bufferSizeAfterShutdown
+ " pending log events from it's processing queue";
LOGGER.error(errorMsg);
errorHandler.error(errorMsg, null, ErrorCode.WRITE_FAILURE);
}
}
kinesisClient.shutdown();
}
public ChainChannelHandler(ChannelHandlerChain chain, ThreadPoolExecutor executor) {
this.chain = chain;
this.executor = executor;
if (executor != null) {
BlockingQueue queue = executor.getQueue();
if (queue instanceof PriorityBlockingQueue && ((PriorityBlockingQueue) queue).comparator() == null) {
runFunc = ComparableRunnable::new;
} else {
runFunc = (r) -> r;
}
}
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (runnable instanceof RejectedRunnable) {
((RejectedRunnable) runnable).rejected(); // 交给用户来处理
} else {
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
// 舍弃1/2队列元素,例如7个单位的元素,舍弃3个
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
// 从头部移除并返问队列头部的元素
queue.poll();
}
try {
// 添加一个元素, 如果队列满,则阻塞
queue.put(runnable);
} catch (InterruptedException e) {
// should not be interrupted
}
}
}
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("txTransaction Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
queue.poll();
}
queue.offer(runnable);
}
}
public void init(Context context, int threadCount, OkHttpClient client) {
mClient = client;
mThreadCount = threadCount < 1 ? 1 : threadCount <= EasyConstants.MAX_THREAD_COUNT ? threadCount : EasyConstants.MAX_THREAD_COUNT;
mExecutor = new ThreadPoolExecutor(
mThreadCount,
mThreadCount,
20,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
mCurrentTaskList = new HashMap<>();
mQueue = (LinkedBlockingQueue<Runnable>) mExecutor.getQueue();
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
logger.error("Thread pool [{}] is exhausted! {}.", threadPoolName, e.toString());
dumpJvmInfoIfNeeded();
if (!e.isShutdown()) {
BlockingQueue<Runnable> queue = e.getQueue();
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
queue.poll();
}
queue.offer(r);
}
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
LinkedBlockingDeque<Runnable> deque = (LinkedBlockingDeque) e.getQueue();
Runnable runnable = deque.pollLast();
if (runnable instanceof DownloadFutureTask) {
DownloadFutureTask futureTask = (DownloadFutureTask) runnable;
futureTask.cancel(true);
String url = futureTask.getUrl();
removeDownloadTask(url, futureTask);
}
e.execute(r);
}
}
public AsynchronousPool(final ThreadPoolExecutor threadPoolExecutor, final Duration awaitDuration) {
this.blockingQueue = threadPoolExecutor.getQueue();
this.executor = threadPoolExecutor;
this.awaitDuration = awaitDuration;
}
/**
* Returns maximum number of tasks that can be submitted to given
* pool (with bounded queue) before saturation (when submission
* throws RejectedExecutionException).
*/
static final int saturatedSize(ThreadPoolExecutor pool) {
BlockingQueue<Runnable> q = pool.getQueue();
return pool.getMaximumPoolSize() + q.size() + q.remainingCapacity();
}