下面列出了java.util.concurrent.ThreadPoolExecutor#CallerRunsPolicy ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Creates an instance of this class.
*
* @param nThreads number of threads in the executor used to assist the selector loop and run
* completion handlers.
*/
public AsynchronousTlsChannelGroup(int nThreads) {
try {
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
timeoutExecutor.setRemoveOnCancelPolicy(true);
this.executor =
new ThreadPoolExecutor(
nThreads,
nThreads,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(nThreads * queueLengthMultiplier),
runnable ->
new Thread(runnable, String.format("async-channel-group-%d-handler-executor", id)),
new ThreadPoolExecutor.CallerRunsPolicy());
selectorThread.start();
}
private void clearData(Map<File,DataNode> map,TableMigrateInfo table){
if(table.isError()) {
return;
}
ExecutorService executor = new ThreadPoolExecutor(margs.getThreadCount(), margs.getThreadCount(),
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());
Iterator<Entry<File,DataNode>> it = map.entrySet().iterator();
while(it.hasNext()){
Entry<File,DataNode> et = it.next();
File f =et.getKey();
DataNode srcDn = et.getValue();
executor.execute(new DataClearRunner(table, srcDn, f));
}
executor.shutdown();
while(true){
if(executor.isTerminated()){
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
LOGGER.error("error",e);
}
}
}
@PostConstruct
private void init() {
heartBeatThreadSize = soaConfig.getHeartBeatThreadSize();
executor = new ThreadPoolExecutor(1, 1, 3L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
SoaThreadFactory.create("heartbeat",Thread.MAX_PRIORITY, true), new ThreadPoolExecutor.CallerRunsPolicy());
executorRun = new ThreadPoolExecutor(heartBeatThreadSize, heartBeatThreadSize, 10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5000), SoaThreadFactory.create("heartbeat-run",Thread.MAX_PRIORITY-1, true),
new ThreadPoolExecutor.CallerRunsPolicy());
soaConfig.registerChanged(new Runnable() {
@Override
public void run() {
if (heartBeatThreadSize != soaConfig.getHeartBeatThreadSize()) {
heartBeatThreadSize = soaConfig.getHeartBeatThreadSize();
executorRun.setCorePoolSize(heartBeatThreadSize);
executorRun.setMaximumPoolSize(heartBeatThreadSize);
}
}
});
executor.execute(() -> {
heartbeat();
});
}
@PostConstruct
public void init() throws MqttException {
final String serverURI = "tcp://localhost:1883";
final MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(false);
options.setMaxInflight(1000);
options.setServerURIs(new String[] {serverURI});
options.setMqttVersion(MQTT_VERSION_3_1_1);
final ThreadFactory threadFactory = new DefaultThreadFactory("mqtt-client-exec");
executorService = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
mqttClient = new MqttClient(serverURI, clientId, persistence, executorService);
mqttClient.setTimeToWait(-1);
mqttClient.connect(options);
mqttClient.setCallback(this);
log.debugf("[MQTT][Connected][client: %s]", clientId);
}
public BatchWriter(EmbeddedSolrServer solr, int batchSize, TaskID tid,
int writerThreads, int queueSize) {
this.solr = solr;
this.writerThreads = writerThreads;
this.queueSize = queueSize;
taskId = tid;
// we need to obtain the settings before the constructor
if (writerThreads != 0) {
batchPool = new ThreadPoolExecutor(writerThreads, writerThreads, 5,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
} else { // single threaded case
batchPool = null;
}
}
static ThreadPoolExecutor getPoolFromNs(final Configuration ns) {
final int maxQueueSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_QUEUE_MAX_LENGTH);
final ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("getDelegate-%d").build();
//begin adaptation of constructor at
//https://github.com/buka/titan/blob/master/src/main/java/com/thinkaurelius/titan/diskstorage/dynamodb/DynamoDBClient.java#L104
final int maxPoolSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE);
final int corePoolSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE);
final long keepAlive = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_KEEP_ALIVE);
final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(maxQueueSize), factory, new ThreadPoolExecutor.CallerRunsPolicy());
//end adaptation of constructor at
//https://github.com/buka/titan/blob/master/src/main/java/com/thinkaurelius/titan/diskstorage/dynamodb/DynamoDBClient.java#L104
executor.allowCoreThreadTimeOut(false);
executor.prestartAllCoreThreads();
return executor;
}
/**
* 通过客户端线程池并行查询测试
*/
@Test
public void testParallelFetchByClientExecutorPool() {
ExecutorService threadPool =
new ThreadPoolExecutor(2, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2),
new ThreadPoolExecutor.CallerRunsPolicy());
QueryParam qp = new QueryParam();
new TaskPair("deviceStatFetcher", DeviceRequest.build(qp));
MultiResult ctx =
parallelExePool.submit(
threadPool,
new TaskPair("deviceStatFetcher", DeviceRequest.build(qp)),
new TaskPair("deviceUvFetcher", DeviceRequest.build(qp)));
List<DeviceViewItem> stat = ctx.getResult("deviceStatFetcher");
List<DeviceViewItem> uv = ctx.getResult("deviceUvFetcher");
Assert.notEmpty(stat);
Assert.notEmpty(uv);
System.out.println(stat);
System.out.println(uv);
}
private DynamicallySizedThreadPoolExecutor createInstance(int corePoolSize, int maximumPoolSize, int keepAliveTime)
{
// We need a thread factory
TraceableThreadFactory threadFactory = new TraceableThreadFactory();
threadFactory.setThreadDaemon(true);
threadFactory.setThreadPriority(Thread.NORM_PRIORITY);
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
return new DynamicallySizedThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Inject
public EVCacheClientPoolManager(IConnectionBuilder connectionFactoryprovider, EVCacheNodeList evcacheNodeList, EVCacheConfig evcConfig) {
instance = this;
this.connectionFactoryProvider = connectionFactoryprovider;
this.evcacheNodeList = evcacheNodeList;
this.evcConfig = evcConfig;
this.evcacheEventListenerList = new CopyOnWriteArrayList<EVCacheEventListener>();
String clientCurrentInstanceId = null;
if(clientCurrentInstanceId == null) clientCurrentInstanceId= System.getenv("EC2_INSTANCE_ID");
if(clientCurrentInstanceId == null) clientCurrentInstanceId= System.getenv("NETFLIX_INSTANCE_ID");
if(log.isInfoEnabled()) log.info("\nClient Current InstanceId from env = " + clientCurrentInstanceId);
if(clientCurrentInstanceId == null && EVCacheConfig.getInstance().getPropertyRepository() != null) clientCurrentInstanceId = EVCacheConfig.getInstance().getPropertyRepository().get("EC2_INSTANCE_ID", String.class).orElse(null).get();
if(clientCurrentInstanceId == null && EVCacheConfig.getInstance().getPropertyRepository() != null) clientCurrentInstanceId = EVCacheConfig.getInstance().getPropertyRepository().get("NETFLIX_INSTANCE_ID", String.class).orElse(null).get();
if(clientCurrentInstanceId != null && !clientCurrentInstanceId.equalsIgnoreCase("localhost")) {
this.defaultReadTimeout = EVCacheConfig.getInstance().getPropertyRepository().get("default.read.timeout", Integer.class).orElse(20);
if(log.isInfoEnabled()) log.info("\nClient Current InstanceId = " + clientCurrentInstanceId + " which is probably a cloud location. The default.read.timeout = " + defaultReadTimeout);
} else { //Assuming this is not in cloud so bump up the timeouts
this.defaultReadTimeout = EVCacheConfig.getInstance().getPropertyRepository().get("default.read.timeout", Integer.class).orElse(750);
if(log.isInfoEnabled()) log.info("\n\nClient Current InstanceId = " + clientCurrentInstanceId + ". Probably a non-cloud instance. The default.read.timeout = " + defaultReadTimeout + "\n\n");
}
this.logEnabledApps = EVCacheConfig.getInstance().getPropertyRepository().get("EVCacheClientPoolManager.log.apps", String.class).orElse("*");
this.defaultRefreshInterval = EVCacheConfig.getInstance().getPropertyRepository().get("EVCacheClientPoolManager.refresh.interval", Integer.class).orElse(60);
this.asyncExecutor = new EVCacheScheduledExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(), 30, TimeUnit.SECONDS, new ThreadPoolExecutor.CallerRunsPolicy(), "scheduled");
asyncExecutor.prestartAllCoreThreads();
this.syncExecutor = new EVCacheExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(), 30, TimeUnit.SECONDS, new ThreadPoolExecutor.CallerRunsPolicy(), "pool");
syncExecutor.prestartAllCoreThreads();
initAtStartup();
}
/**
* �����̳߳أ����߳�ִ������
* @param taskid ����ID
* @param batchcase ���������ַ�����#����
* @throws Exception ���쳣
*/
public static void batchCaseExecuteForTast(String taskid, String batchcase) throws Exception{
int threadcount = GetServerApi.cGetTaskSchedulingByTaskId(Integer.parseInt(taskid)).getExThreadCount();
ThreadPoolExecutor threadExecute = new ThreadPoolExecutor(threadcount, 30, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
//ִ��ȫ���dzɹ�״̬����
if(batchcase.contains("ALLFAIL")){
//��ʼ��д��������Լ���־ģ��
serverOperation caselog = new serverOperation();
List<Integer> caseIdList = caselog.getCaseListForUnSucByTaskId(taskid);
for (Integer integer : caseIdList) {
ProjectCase testcase = GetServerApi.cGetCaseByCaseId(integer);
TestControl.THREAD_COUNT++; //���̼߳���++�����ڼ���߳��Ƿ�ȫ��ִ����
threadExecute.execute(new ThreadForBatchCase(testcase.getCaseId(), taskid));
}
}else{ //����ִ������
String[] temp=batchcase.split("#");
LogUtil.APP.info("��ǰ����ִ�������й��С�{}��������������...",temp.length);
for (String s : temp) {
TestControl.THREAD_COUNT++; //���̼߳���++�����ڼ���߳��Ƿ�ȫ��ִ����
threadExecute.execute(new ThreadForBatchCase(Integer.valueOf(s), taskid));
}
}
//���̼߳��������ڼ���߳��Ƿ�ȫ��ִ����
int i=0;
while(TestControl.THREAD_COUNT!=0){
i++;
if(i>600){
break;
}
Thread.sleep(6000);
}
threadExecute.shutdown();
}
/**
* execute using CallerRunsPolicy drops task on shutdown
*/
public void testCallerRunsOnShutdown() {
RejectedExecutionHandler h = new ThreadPoolExecutor.CallerRunsPolicy();
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 1,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1), h);
try { p.shutdown(); } catch (SecurityException ok) { return; }
try (PoolCleaner cleaner = cleaner(p)) {
TrackedNoOpRunnable r = new TrackedNoOpRunnable();
p.execute(r);
assertFalse(r.done);
}
}
/**
* Creates all objects needed for a traversal.
*
* @param context the context used to get the configuration
* @throws Exception if configuration parameters are invalid
*/
@Override
public void init(IndexingConnectorContext context) throws Exception {
checkState(Configuration.isInitialized(), "configuration not initialized");
indexingService = checkNotNull(context.getIndexingService());
defaultAcl = DefaultAcl.fromConfiguration(indexingService);
repositoryContext =
new RepositoryContext.Builder()
.setEventBus(new EventBus("EventBus-" + getClass().getName()))
.setDefaultAclMode(defaultAcl.getDefaultAclMode())
.build();
if (checkpointHandler == null) {
checkpointHandler = LocalFileCheckpointHandler.fromConfiguration();
}
useQueues = Configuration.getBoolean(TRAVERSE_USE_QUEUES, DEFAULT_USE_QUEUES).get();
QueueCheckpoint.Builder builder = new QueueCheckpoint.Builder(useQueues);
if (useQueues) {
String tag =
Configuration.getString(TRAVERSE_QUEUE_TAG, repository.getClass().getSimpleName()).get();
builder.setCheckpointHandler(checkpointHandler)
.setQueueA(QUEUE_NAME + "A: " + tag)
.setQueueB(QUEUE_NAME + "B: " + tag);
}
queueCheckpoint = builder.build();
numThreads = Configuration.getInteger(NUM_THREADS, DEFAULT_THREAD_NUM).get();
// TODO(bmj): Fix this gross violation of encapsulation.
numToAbort = Configuration
.getValue(TraverseExceptionHandlerFactory.TRAVERSE_EXCEPTION_HANDLER, 0L, value -> {
if (IGNORE_FAILURE.equals(value)) {
return Long.MAX_VALUE;
} else {
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
throw new InvalidConfigurationException(
"Unrecognized value for traversal exception handler: " + value, e);
}
}
}).get();
threadPoolExecutor =
new ThreadPoolExecutor(
numThreads,
numThreads,
0,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10 * numThreads),
new ThreadPoolExecutor.CallerRunsPolicy());
listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
partitionSize = Configuration.getInteger(TRAVERSE_PARTITION_SIZE, DEFAULT_PARTITION_SIZE).get();
Configuration.checkConfiguration(
partitionSize > 0,
"Partition size can not be less than or equal to 0. Configured value %s",
partitionSize);
repositoryContext.getEventBus().register(this);
repository.init(repositoryContext);
logger.log(Level.INFO, "start full traversal connector executors");
}
@Override
protected void validate() throws Exception {
super.validate();
archiveStore = archiveStore != null ? archiveStore : Plugins.ARCHIVESTORE.get();
archiveStore.setNameSpace(archiveConfig.getNamespace());
logger.info("Get archive store namespace [{}] by archive config.", archiveConfig.getNamespace());
Preconditions.checkArgument(archiveStore != null, "archive store can not be null.");
Preconditions.checkArgument(archiveConfig != null, "archive config can not be null.");
this.tracer = Plugins.TRACERERVICE.get(archiveConfig.getTracerType());
this.batchNum = archiveConfig.getProduceBatchNum();
this.archiveQueue = new LinkedBlockingDeque<>(archiveConfig.getLogQueueSize());
this.executorService = new ThreadPoolExecutor(archiveConfig.getWriteThreadNum(), archiveConfig.getWriteThreadNum(),
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(archiveConfig.getThreadPoolQueueSize()), new NamedThreadFactory("sendLog-archive"), new ThreadPoolExecutor.CallerRunsPolicy());
this.updateItemThread = LoopThread.builder()
.sleepTime(1000 * 10, 1000 * 10)
.name("UpdateArchiveItem-Thread")
.onException(e -> logger.warn("Exception:", e))
.doWork(() -> {
// 更新item列表
updateArchiveItem();
// 同步归档位置
syncArchivePosition();
}).build();
this.readMsgThread = LoopThread.builder()
.sleepTime(0, 10)
.name("ReadArchiveMsg-Thread")
.onException(e -> logger.warn("Exception:", e))
.doWork(() -> {
// 消费接口读取消息,放入队列
readArchiveMsg();
}).build();
this.writeMsgThread = LoopThread.builder()
.sleepTime(10, 10)
.name("WriteArchiveMsg-Thread")
.onException(e -> logger.warn("Exception:", e))
.doWork(() -> {
// 队列读取消息,放入归档存储
write2Store();
}).build();
}
public static void visitParallel(ISliceViewIterator iterator, final SliceVisitor visitor, int nProcessors) throws Exception {
//Can't just farm out each slice to a separate thread, need to block when thread pool full,
//other wise there is the potential run out of memory from loading all the data before any is processed
//use one less thread than processors, as we are using one for the rejectedhandler
nProcessors = nProcessors - 1;
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(nProcessors);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
final ExecutorService pool = new ThreadPoolExecutor(nProcessors, nProcessors,
0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
final SliceVisitor parallel = new SliceVisitor() {
@Override
public void visit(final IDataset slice) throws Exception {
pool.execute(new Runnable() {
@Override
public void run() {
try {
visitor.visit(slice);
} catch (Throwable ne) {
ne.printStackTrace();
// TODO Fix me - should runtime exception really be thrown back to Fork/Join?
//throw new RuntimeException(ne.getMessage(), ne);
}
}
});
}
@Override
public boolean isCancelled() {
return visitor.isCancelled();
}
};
Slicer.visit(iterator, parallel);
pool.shutdown();
while (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)){
if (visitor.isCancelled()) break;
}
}
public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
return new ThreadPoolExecutor(numThreads, numThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
public StressAddMultiThreadedTest() {
queue = new ArrayBlockingQueue<>(THREADS);
executor = new ThreadPoolExecutor(THREADS, THREADS, 100,
TimeUnit.MILLISECONDS, queue,
new ThreadPoolExecutor.CallerRunsPolicy());
}
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
public CurrentThreadExecutorService() {
super(0, 1, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());
}
/***
* Returns an executor that holds up to maxQueueSize unprocessed tasks
* and processes maxThreads in parallel. When more than maxQueueSize tasks
* are submitted, the task is rejected from the queue and executed in the
* calling thread directly
*
* @param maxThreads
* @param maxQueueSize
* @return
*/
public static ThreadPoolExecutor getBoundThreadPoolExecutor(final int maxThreads, final int maxQueueSize) {
// The queue holds up to maxQueueSize unexecuted tasks
final BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(maxQueueSize);
// The Pool executor executes up to maxThreads in parallel. If the queue is full, the caller
// has to execute the task directly.
return new ThreadPoolExecutor(maxThreads / 2, maxThreads, 30, TimeUnit.SECONDS,
linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* Returns a counting handler for rejected tasks that runs the rejected task directly in the
* calling thread of the execute method, unless the executor has been shut down, in which case
* the task is discarded.
*/
public static CountingRejectedExecutionHandler newCallerRunsPolicy() {
return new CountingRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}