下面列出了怎么用java.util.concurrent.ThreadPoolExecutor的API类实例代码及写法,或者点击链接到github查看源代码。
public void execute() {
ThreadPoolExecutor exec = ss.getSource().getProcessor().getExecutor();
for (final RouteResultsetNode rrn : nodes) {
final Channel c = ss.getTarget().get(rrn);
if (c != null && !c.isRunning() && !c.isClosed()) {
c.setRunning(true);
exec.execute(new Runnable() {
@Override
public void run() {
execute0(rrn, c, autocommit, ss, flag, sql, exeTime);
}
});
} else {
newExecute(rrn, autocommit, ss, flag, sql, exeTime);
}
}
}
@Override
public void start() {
if (startFlag.compareAndSet(false, true)) {
updateCache();
executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(50),
SoaThreadFactory.create("QueueService", true), new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(() -> {
// 因为第一次的时候,会由topic和dbnode 触发初始化,所以自身初始化可以减少一次
checkChanged();
while (isRunning) {
try {
updateCache();
lastUpdateTime = System.currentTimeMillis();
} catch (Exception e) {
log.error("QueueServiceImpl_initCache_error", e);
}
Util.sleep(soaConfig.getMqQueueCacheInterval());
}
});
}
}
public Session(ThreadPoolExecutor executorService,
AuthenticationService authenticationService,
ExtendedList<Account> accountList,
ExtendedList<Credential> credentialList,
ExtendedList<Proxy> proxyList,
ExtendedList<Objective> objectiveList,
ExtendedList<AccountOutput> outputList,
Tracker tracker,
URL checkUrl,
int proxyTimeout) {
this.executorService = executorService;
this.authenticationService = authenticationService;
this.accountList = accountList;
this.credentialList = credentialList;
this.proxyList = proxyList;
this.objectiveList = objectiveList;
this.outputList = outputList;
this.tracker = tracker;
this.tracker.setTotal(accountList.size());
this.checkUrl = checkUrl;
this.proxyTimeout = proxyTimeout;
}
public SocketService(SocketListener listener) {
this.listener = listener;
try {
ServerSocket serverSocket = new ServerSocket(PORT);
System.out.println("server running " + PORT + " port");
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
5000,
TimeUnit.MILLISECONDS,
queue
);
while (true) {
Socket socket = serverSocket.accept();
executor.execute(new processMsg(socket));
}
} catch (Exception e) {
System.out.println("SocketServer create Exception:" + e);
}
}
/**
* ThreadPoolExecutor线程池
*/
private static void threadPoolExecutor() {
int corePoolSize=5;
int maximumPoolSize=10;
long keepAliveTime=2L;
// 线程核心数,最大线程数,线程缓存时间,时间格式,缓存队列 ,线程工厂,拒绝策略
ThreadPoolExecutor tpx=new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 产生一个任务,并将其加入到线程池
String task = "[email protected] " + i;
// System.out.println("put " + task);
tpx.execute(new MyThread(task));
// 便于观察,等待一段时间
Thread.sleep(20);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public JavaCronetEngine(CronetEngineBuilderImpl builder) {
// On android, all background threads (and all threads that are part
// of background processes) are put in a cgroup that is allowed to
// consume up to 5% of CPU - these worker threads spend the vast
// majority of their time waiting on I/O, so making them contend with
// background applications for a slice of CPU doesn't make much sense.
// We want to hurry up and get idle.
final int threadPriority =
builder.threadPriority(THREAD_PRIORITY_BACKGROUND + THREAD_PRIORITY_MORE_FAVORABLE);
this.mUserAgent = builder.getUserAgent();
this.mExecutorService = new ThreadPoolExecutor(10, 20, 50, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
return Executors.defaultThreadFactory().newThread(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("JavaCronetEngine");
android.os.Process.setThreadPriority(threadPriority);
r.run();
}
});
}
});
}
public static ThreadPoolExecutor newSingleDaemonThreadExecutor(long keepAliveTime) {
return new ThreadPoolExecutor(1,
1,
keepAliveTime,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
DaemonThreadFactory.daemonThreadFactory,
(r, exe) -> {
if (!exe.isShutdown()) {
try {
exe.getQueue().put(r);
} catch (InterruptedException e) {
// ignore
}
}
});
}
private AtomicLong test_unit() throws InterruptedException {
final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null);
executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
statsItemSet.addValue("topicTest", 2, 1);
}
});
}
while (true) {
if (executor.getCompletedTaskCount() == 10) {
break;
}
Thread.sleep(1000);
}
return statsItemSet.getStatsItem("topicTest").getValue();
}
/**
* 作业平台使用的线程池
* @return
*/
@Bean(name = "uKeFuTaskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
// 线程池维护线程的最少数量
poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
// 线程池维护线程的最大数量
poolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
// 线程池所使用的缓冲队列
poolTaskExecutor.setQueueCapacity(200);
// 线程池维护线程所允许的空闲时间
poolTaskExecutor.setKeepAliveSeconds(30);
poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return poolTaskExecutor;
}
/**
* 逻辑调用结束后处理阶段
*/
private void finallyProcess() {
final ThreadPoolExecutor service = config.getService();
if (service == null) {
throw new SchedulerException("ThreadPoolExecutor不能为空");
}
if (!close && !service.isShutdown()) {
final long interval = delay();
final int runNumberOfTimes = config.getRunNumberOfTimes();
if (runNumberOfTimes == 0) {
thisWait(interval);
} else {
nowTimes++;
if (nowTimes < runNumberOfTimes) {
thisWait(interval);
} else {
close = true;
nowTimes = 0;
}
}
}
}
/**
* Tests that a given rule rule will be computed and added once to the cache
* whatever the number of times this rule is called concurrently.
*/
@Test
public void testConcurrentCreation() throws InterruptedException,
ExecutionException {
// Number of times the same rule will be called.
final int numTasks = 20;
final ThreadPoolExecutor exec
= new ThreadPoolExecutor(3, numTasks, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2));
final List<Future<Pair<double[], double[]>>> results
= new ArrayList<Future<Pair<double[], double[]>>>();
for (int i = 0; i < numTasks; i++) {
results.add(exec.submit(new RuleBuilder()));
}
// Ensure that all computations have completed.
for (Future<Pair<double[], double[]>> f : results) {
f.get();
}
// Assertion would fail if "getRuleInternal" were not "synchronized".
final int n = RuleBuilder.getNumberOfCalls();
Assert.assertEquals("Rule computation was called " + n + " times", 1, n);
}
private static ExecutorService newFixedThreadPool(
final int nThreads,
final String threadFactoryName,
final int queueSize
) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueSize),
new ThreadFactoryBuilder()
.setNameFormat(threadFactoryName)
.build(),
(r, executor) -> {
// this will block if the queue is full
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
});
}
public RocksDBSegmentLogStorage(final String path, final RaftOptions raftOptions, final int valueSizeThreshold,
final int maxSegmentFileSize, final int preAllocateSegmentCount,
final int keepInMemorySegmentCount, final int checkpointIntervalMs,
final ThreadPoolExecutor writeExecutor) {
super(path, raftOptions);
if (Platform.isMac()) {
LOG.warn("RocksDBSegmentLogStorage is not recommended on mac os x, it's performance is poorer than RocksDBLogStorage.");
}
Requires.requireTrue(maxSegmentFileSize > 0, "maxSegmentFileSize is not greater than zero");
Requires.requireTrue(preAllocateSegmentCount > 0, "preAllocateSegmentCount is not greater than zero");
Requires.requireTrue(checkpointIntervalMs > 0, "checkpointIntervalMs is not greater than zero");
Requires.requireTrue(keepInMemorySegmentCount > 0, "keepInMemorySegmentCount is not greater than zero");
this.segmentsPath = path + File.separator + "segments";
this.abortFile = new AbortFile(this.segmentsPath + File.separator + "abort");
this.checkpointFile = new CheckpointFile(this.segmentsPath + File.separator + "checkpoint");
this.valueSizeThreshold = valueSizeThreshold;
this.maxSegmentFileSize = maxSegmentFileSize;
this.writeExecutor = writeExecutor == null ? createDefaultWriteExecutor() : writeExecutor;
this.preAllocateSegmentCount = preAllocateSegmentCount;
this.checkpointIntervalMs = checkpointIntervalMs;
this.keepInMemorySegmentCount = keepInMemorySegmentCount;
}
@Override
public synchronized void start() {
if (state != JournalState.STOPPED) {
throw new IllegalStateException("Journal " + this + " is not stopped, state is " + state);
}
if (providedIOThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ArtemisIOThread", true, JournalImpl.class.getClassLoader());
}
});
threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue(), factory);
ioExecutorFactory = new OrderedExecutorFactory(threadPool);
} else {
ioExecutorFactory = providedIOThreadPool;
}
filesExecutor = ioExecutorFactory.getExecutor();
compactorExecutor = ioExecutorFactory.getExecutor();
appendExecutor = ioExecutorFactory.getExecutor();
filesRepository.setExecutor(filesExecutor);
fileFactory.start();
setJournalState(JournalState.STARTED);
}
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
public static ExecutorService newThreadPool(BlockingQueue<Runnable> workQueue, String prefix, int threadCount,
boolean watch) {
BlurThreadPoolExecutor executorService = new BlurThreadPoolExecutor(threadCount, threadCount, 60L,
TimeUnit.SECONDS, workQueue, new BlurThreadFactory(prefix));
executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executorService.add(new UserThreadBoundaryProcessor());
if (watch) {
return Trace.getExecutorService(ThreadWatcher.instance().watch(executorService));
}
return Trace.getExecutorService(executorService);
}
public static ThreadPoolExecutor initPool(ServerConfig serverConfig) {
int minPoolSize = serverConfig.getCoreThreads();
int maxPoolSize = serverConfig.getMaxThreads();
int queueSize = serverConfig.getQueues();
int aliveTime = serverConfig.getAliveTime();
BlockingQueue<Runnable> poolQueue = queueSize > 0 ? new LinkedBlockingQueue<Runnable>(
queueSize) : new SynchronousQueue<Runnable>();
return new ThreadPoolExecutor(minPoolSize, maxPoolSize, aliveTime, TimeUnit.MILLISECONDS, poolQueue);
}
@Test
public synchronized void testConcurrentConnections() throws InterruptedException {
Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
icebergTable.updateProperties()
.set(COMMIT_NUM_RETRIES, "20")
.set(COMMIT_MIN_RETRY_WAIT_MS, "25")
.set(COMMIT_MAX_RETRY_WAIT_MS, "25")
.commit();
String fileName = UUID.randomUUID().toString();
DataFile file = DataFiles.builder(icebergTable.spec())
.withPath(FileFormat.PARQUET.addExtension(fileName))
.withRecordCount(2)
.withFileSizeInBytes(0)
.build();
ExecutorService executorService = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(7));
for (int i = 0; i < 7; i++) {
executorService.submit(() -> icebergTable.newAppend().appendFile(file).commit());
}
executorService.shutdown();
Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
Assert.assertEquals(7, Iterables.size(icebergTable.snapshots()));
}
public TestCaseAServlet() {
String backendMockHost = System.getProperty("perf.test.backend.hostname", "127.0.0.1");
String backendMockPort = System.getProperty("perf.test.backend.port", "8989");
backendMockUriPrefix = "http://" + backendMockHost + ':' + backendMockPort + "/ws-backend-mock";
final RequestConfig reqConfig = RequestConfig.custom()
.setConnectTimeout(PropertyNames.ClientConnectTimeout.getValueAsInt())
.setSocketTimeout(PropertyNames.ClientSocketTimeout.getValueAsInt())
.setConnectionRequestTimeout(PropertyNames.ClientConnectionRequestTimeout.getValueAsInt())
.build();
// don't care about total vs. per-route right now, will set them to the same
final PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager();
connMgr.setMaxTotal(PropertyNames.ClientMaxConnectionsTotal.getValueAsInt());
connMgr.setDefaultMaxPerRoute(PropertyNames.ClientMaxConnectionsTotal.getValueAsInt());
client = HttpClients.custom()
.setDefaultRequestConfig(reqConfig)
.setConnectionManager(connMgr)
.build();
// used for parallel execution
final int backendRequestThreadPoolSize = PropertyNames.BackendRequestThreadPoolSize.getValueAsInt();
// setting core and max pool sizes the same since I do not want any queueing in here
executor = new ThreadPoolExecutor(backendRequestThreadPoolSize,
backendRequestThreadPoolSize,
5,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>());
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
/**
* Given a {@link ThreadPoolExecutor}, attach various {@link Gauge}s against its monitoring
* properties.
* @param executor {@link ThreadPoolExecutor} to monitor.
* @param threadPoolName a unique name for this thread pool.
*/
public static void instrument(final ThreadPoolExecutor executor, String threadPoolName) {
MetricRegistry registry = Metrics.getRegistry();
registry.register(name(threadPoolName, "queue-size"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return executor.getQueue().size();
}
});
registry.register(name(threadPoolName, "queue-max"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return executor.getQueue().size() + executor.getQueue().remainingCapacity();
}
});
registry.register(name(threadPoolName, "threadpool-active"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return executor.getActiveCount();
}
});
registry.register(name(threadPoolName, "threadpool-max"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return executor.getMaximumPoolSize();
}
});
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
throw new RejectedExecutionException(LocalizedStrings.PooledExecutorWithDMStats_EXECUTOR_HAS_BEEN_SHUTDOWN.toLocalizedString());
} else {
try {
executor.getQueue().put(r);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
RejectedExecutionException e = new RejectedExecutionException(LocalizedStrings.PooledExecutorWithDMStats_INTERRUPTED.toLocalizedString());
e.initCause(ie);
throw e;
}
}
}
/**
* pull block and trans from chain async
* @return ThreadPoolTaskExecutor
*/
@Bean
public ThreadPoolTaskExecutor mgrAsyncExecutor() {
log.info("start mgrAsyncExecutor init..");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(executorProperties.getCorePoolSize());
executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
executor.setQueueCapacity(executorProperties.getQueueSize());
executor.setThreadNamePrefix(executorProperties.getThreadNamePrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// init executor
executor.initialize();
return executor;
}
public static ThreadPoolExecutor getPoll(){
AsyncTaskProperties properties = SpringContextHolder.getBean(AsyncTaskProperties.class);
return new ThreadPoolExecutor(
properties.getCorePoolSize(),
properties.getMaxPoolSize(),
properties.getKeepAliveSeconds(),
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(properties.getQueueCapacity()),
new TheadFactoryName()
);
}
FutureQueryExecuteCallback(ThreadPoolExecutor threadPool, boolean asyn,
ExecuteCallback callback, CyclicBarrier barriar,
Transaction trasaction) {
this.threadPool = threadPool;
this.asyn = asyn;
this.callback = callback;
this.barriar = barriar;
this.trasaction = trasaction;
}
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("asyncTaskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
private void startJobConfigProcessThread() {
int configThreadCount = DEFAULT_CONFIG_THREAD_COUNT;
LOG.info("Job Config crawler main thread started, pool size: " + DEFAULT_CONFIG_THREAD_COUNT);
ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable runnable) {
count.incrementAndGet();
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setName("config-crawler-workthread-" + count.get());
return thread;
}
};
ThreadPoolExecutor pool = new ThreadPoolExecutor(configThreadCount, configThreadCount, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), factory);
while (true) {
JobContext context;
try {
context = queueOfConfig.take();
LOG.info("queueOfConfig size: " + queueOfConfig.size());
Runnable configCrawlerThread = new ConfigWorkTask(new JobContext(context), fetcher, callback, this);
pool.execute(configCrawlerThread);
} catch (InterruptedException e) {
LOG.warn("Got an InterruptedException: " + e.getMessage());
} catch (RejectedExecutionException e2) {
LOG.warn("Got RejectedExecutionException: " + e2.getMessage());
}
catch (Throwable t) {
LOG.warn("Got an throwable t, " + t.getMessage());
}
}
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
if (runnable instanceof CompactionRunner) {
CompactionRunner runner = (CompactionRunner) runnable;
LOG.debug("Compaction Rejected: " + runner);
if (runner.compaction != null) {
runner.store.cancelRequestedCompaction(runner.compaction);
}
}
}
@Override
protected ExecutorService newExecutorService(Properties batchConfig) {
return new ThreadPoolExecutor(
getInt(batchConfig, BOUNDED_THREADPOOL_MIN_POOL_SIZE, "3"),
getInt(batchConfig, BOUNDED_THREADPOOL_MAX_POOL_SIZE, "10"),
getInt(batchConfig, BOUNDED_THREADPOOL_KEEP_ALIVE_TIME, "900"), TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(getInt(batchConfig, BOUNDED_THREADPOOL_QUEUE_SIZE, "4096")),
BatcheeThreadFactory.INSTANCE);
}
/**
* 固定大小线程池,自定义队列、线程池工厂和拒绝策略
*
* @param corePoolSize 初始化线程池
* @param queue 线程池队列
* @param threadFactory 线程池工厂
* @param handler 拒绝策略
* @return the thread pool executor
*/
public static ThreadPoolExecutor newFixedThreadPool(int corePoolSize,
BlockingQueue<Runnable> queue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
return new ThreadPoolExecutor(corePoolSize,
corePoolSize,
0,
TimeUnit.MILLISECONDS,
queue,
threadFactory,
handler);
}