下面列出了java.util.concurrent.ThreadPoolExecutor#prestartAllCoreThreads ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* prestartAllCoreThreads starts all corePoolSize threads
*/
public void testPrestartAllCoreThreads() {
final ThreadPoolExecutor p =
new CustomTPE(2, 6,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.setCorePoolSize(4);
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
}
}
private ExecutorService getExecutorService() {
BlockingQueue<Runnable> workQueue;
if (config.getWorkerQueueSize() > 0) {
workQueue = new ArrayBlockingQueue<>(config.getWorkerQueueSize());
} else {
workQueue = new LinkedBlockingDeque<>();
}
ThreadPoolExecutor executors = new ThreadPoolExecutor(config.getWorkerThreads(), config.getWorkerThreads(),
0L, TimeUnit.MILLISECONDS, workQueue, r -> new Thread(r, "ThriftWorker")) {
@Override
public void execute(Runnable command) {
super.execute(() -> {
ConsumerServiceImpl.getInstance().setThriftContext(new ThriftContext(command));
command.run();
});
}
};
executors.prestartAllCoreThreads();
return executors;
}
/**
* prestartAllCoreThreads starts all corePoolSize threads
*/
public void testPrestartAllCoreThreads() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(2, 6,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.setCorePoolSize(4);
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
}
}
/**
* prestartAllCoreThreads starts all corePoolSize threads
*/
public void testPrestartAllCoreThreads() {
final ThreadPoolExecutor p =
new CustomTPE(2, 6,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.setCorePoolSize(4);
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
}
}
private ThreadPoolExecutor multiThreadUpload(int threadNum, final int threadFileNum) {
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);
pool.prestartAllCoreThreads();
for (int i = 0; i < threadNum; ++i) {
final int threadId = i;
pool.submit(new Runnable() {
@Override
public void run() {
uploadAndDownloadPerform(threadId, threadFileNum);
}
});
}
pool.shutdown();
return pool;
}
static ThreadPoolExecutor getPoolFromNs(final Configuration ns) {
final int maxQueueSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_QUEUE_MAX_LENGTH);
final ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("getDelegate-%d").build();
//begin adaptation of constructor at
//https://github.com/buka/titan/blob/master/src/main/java/com/thinkaurelius/titan/diskstorage/dynamodb/DynamoDBClient.java#L104
final int maxPoolSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE);
final int corePoolSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE);
final long keepAlive = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_KEEP_ALIVE);
final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(maxQueueSize), factory, new ThreadPoolExecutor.CallerRunsPolicy());
//end adaptation of constructor at
//https://github.com/buka/titan/blob/master/src/main/java/com/thinkaurelius/titan/diskstorage/dynamodb/DynamoDBClient.java#L104
executor.allowCoreThreadTimeOut(false);
executor.prestartAllCoreThreads();
return executor;
}
@Override
public void start() throws IOException {
if (!isStarted) {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), getIPCPort());
for (ThreadPoolExecutor executor : chunkExecutors) {
executor.prestartAllCoreThreads();
}
server.start();
int realPort =
((RaftServerProxy) server).getServerRpc().getInetSocketAddress()
.getPort();
if (port == 0) {
LOG.info("{} {} is started using port {}", getClass().getSimpleName(),
server.getId(), realPort);
port = realPort;
}
//register the real port to the datanode details.
datanodeDetails.setPort(DatanodeDetails
.newPort(DatanodeDetails.Port.Name.RATIS,
realPort));
isStarted = true;
}
}
public void createThreadPool() {
int maxUserConnected = GlobalContext.getConfig().getMaxUsersTotal();
int maxAliveThreads = maxUserConnected + GlobalContext.getConfig().getMaxUsersExempt();
int minAliveThreads = (int) Math.round(maxAliveThreads * 0.25);
_pool = new ThreadPoolExecutor(minAliveThreads, maxAliveThreads, 3 * 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ConnectionThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
_pool.allowCoreThreadTimeOut(false);
_pool.prestartAllCoreThreads();
}
@Override
protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue,
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
if (preStartAllCoreThreads) {
threadPoolExecutor.prestartAllCoreThreads();
}
return threadPoolExecutor;
}
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
threadPool.setThreadFactory(new NamedThreadFactory("SEV-" + serverConfig.getProtocol().toUpperCase()
+ "-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
if (serverConfig.isPreStartCore()) { // 初始化核心线程池
threadPool.prestartAllCoreThreads();
}
return threadPool;
}
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
threadPool.setThreadFactory(new NamedThreadFactory(
"SEV-BOLT-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
if (serverConfig.isPreStartCore()) { // 初始化核心线程池
threadPool.prestartAllCoreThreads();
}
return threadPool;
}
/**
* 初始化线程池
*/
public void init() {
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,
ThreadPoolUtils.buildQueue(queueSize), new NamedThreadFactory(threadPoolName));
if (allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
if (prestartAllCoreThreads) {
executor.prestartAllCoreThreads();
}
}
@Override
public void executeApp() throws Exception {
executor =
new ThreadPoolExecutor(1, 1, 60, MILLISECONDS, Queues.newLinkedBlockingQueue());
// need to pre-create threads, otherwise lambda execution will be captured by the
// initial thread run, and won't really test lambda execution capture
executor.prestartAllCoreThreads();
transactionMarker();
}
public RedisAccessParallel(Map<Integer, String> redisMap) {
super(redisMap);
this.checkResultTimeoutMs = Config.getInstance().getCheckJedisResultTimeoutMs();
pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(Config.getInstance().getRedisAccessThreadNum());
pool.prestartAllCoreThreads();
}
public void start()
{
try
{
_selectorThread = new SelectorThread(this, _numberOfSelectors);
final int corePoolSize = _poolSize;
final int maximumPoolSize = _poolSize;
final long keepAliveTime = _threadKeepAliveTimeout;
final java.util.concurrent.BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
final ThreadFactory factory = _factory;
_executor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.MINUTES,
workQueue,
QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(factory));
_executor.prestartAllCoreThreads();
_executor.allowCoreThreadTimeOut(true);
for(int i = 0 ; i < _poolSize; i++)
{
_executor.execute(_selectorThread);
}
}
catch (IOException e)
{
throw new TransportException(e);
}
}
private static ThreadPoolExecutor createExecutor() {
int threadCount = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount, 1L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ThreadFactoryFactory.getThreadFactory("rendering"));
executor.prestartAllCoreThreads();
return executor;
}
/**
* Execute la tache avec plusieurs Threads
*
* @param request
* @return
* @throws Exception
*/
private long execute(HttpGetRequestRunnable request, int numberOfRequests, int threads) throws Exception {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
httpClient =
HttpClientBuilder
.create()
.setConnectionManager(connectionManager)
.setMaxConnTotal(threads)
.setMaxConnPerRoute(threads)
.setDefaultRequestConfig(
RequestConfig.custom().setConnectTimeout(10000).setSocketTimeout(10000).build())
.build();
// Warm up
request.run();
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(threads, threads, 5, TimeUnit.SECONDS, queue);
long start = System.currentTimeMillis();
threadPool.prestartAllCoreThreads();
for (int i = 0; i < numberOfRequests; i++) {
threadPool.submit(request);
}
threadPool.shutdown();
// wait maximum 20 s
threadPool.awaitTermination(200, TimeUnit.SECONDS);
connectionManager.shutdown();
if (request.exception != null) {
throw new AssertionFailedError("Exception for request " + request.url + " after " + request.count
+ " requests", request.exception);
}
if (threadPool.getCompletedTaskCount() < threadPool.getTaskCount()) {
// All task were not executed
String msg =
request.url + " : Only " + threadPool.getCompletedTaskCount() + "/" + threadPool.getTaskCount()
+ " have been renderered " + " => Maybe a performance issue";
threadPool.shutdownNow();
fail(msg);
}
long end = System.currentTimeMillis();
long execTime = end - start;
LOG.debug("Executed request " + request.url + " " + numberOfRequests + " times with " + threads
+ " threads in " + execTime + "ms");
return execTime;
}
public void load() {
final int coreThreads = Runtime.getRuntime().availableProcessors();
executor = new ThreadPoolExecutor(coreThreads, coreThreads * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getServer().getName() + " : EventHandler"));
executor.prestartAllCoreThreads();
}
@Test
public void shouldAllowMultipleConnectionsInParallel() throws InterruptedException
{
final int numberOfArchiveClients = 5;
final long connectTimeoutNs = TimeUnit.SECONDS.toNanos(10);
final CountDownLatch latch = new CountDownLatch(numberOfArchiveClients);
final ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(numberOfArchiveClients);
final ManyToOneConcurrentLinkedQueue<AeronArchive> archiveClientQueue = new ManyToOneConcurrentLinkedQueue<>();
final MediaDriver.Context driverCtx = new MediaDriver.Context()
.errorHandler(Tests::onError)
.clientLivenessTimeoutNs(connectTimeoutNs)
.dirDeleteOnStart(true)
.publicationUnblockTimeoutNs(connectTimeoutNs * 2)
.threadingMode(ThreadingMode.SHARED);
final Context archiveCtx = new Context()
.threadingMode(SHARED)
.connectTimeoutNs(connectTimeoutNs);
executor.prestartAllCoreThreads();
try (ArchivingMediaDriver driver = ArchivingMediaDriver.launch(driverCtx, archiveCtx))
{
for (int i = 0; i < numberOfArchiveClients; i++)
{
executor.execute(
() ->
{
final AeronArchive.Context ctx = new AeronArchive.Context().messageTimeoutNs(connectTimeoutNs);
final AeronArchive archive = AeronArchive.connect(ctx);
archiveClientQueue.add(archive);
latch.countDown();
});
}
latch.await(driver.archive().context().connectTimeoutNs() * 2, TimeUnit.NANOSECONDS);
AeronArchive archiveClient;
while (null != (archiveClient = archiveClientQueue.poll()))
{
archiveClient.close();
}
assertEquals(0L, latch.getCount());
}
finally
{
executor.shutdownNow();
archiveCtx.deleteDirectory();
driverCtx.deleteDirectory();
}
}
/**
* Configures this appender instance and makes it ready for use by the
* consumers. It validates mandatory parameters and confirms if the configured
* stream is ready for publishing data yet.
*
* Error details are made available through the fallback handler for this
* appender
*
* @throws IllegalStateException
* if we encounter issues configuring this appender instance
*/
@Override
public void activateOptions() {
if (streamName == null) {
initializationFailed = true;
error("Invalid configuration - streamName cannot be null for appender: " + name);
}
if (layout == null) {
initializationFailed = true;
error("Invalid configuration - No layout for appender: " + name);
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration = setProxySettingsFromSystemProperties(clientConfiguration);
clientConfiguration.setMaxErrorRetry(maxRetries);
clientConfiguration.setRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION,
PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY, maxRetries, true));
clientConfiguration.setUserAgent(AppenderConstants.USER_AGENT_STRING);
BlockingQueue<Runnable> taskBuffer = new LinkedBlockingDeque<Runnable>(bufferSize);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount, threadCount,
AppenderConstants.DEFAULT_THREAD_KEEP_ALIVE_SEC, TimeUnit.SECONDS, taskBuffer, new BlockFastProducerPolicy());
threadPoolExecutor.prestartAllCoreThreads();
kinesisClient = new AmazonKinesisAsyncClient(new CustomCredentialsProviderChain(), clientConfiguration,
threadPoolExecutor);
boolean regionProvided = !Validator.isBlank(region);
if (!regionProvided) {
region = AppenderConstants.DEFAULT_REGION;
}
if (!Validator.isBlank(endpoint)) {
if (regionProvided) {
LOGGER
.warn("Received configuration for both region as well as Amazon Kinesis endpoint. ("
+ endpoint
+ ") will be used as endpoint instead of default endpoint for region ("
+ region + ")");
}
kinesisClient.setEndpoint(endpoint,
AppenderConstants.DEFAULT_SERVICE_NAME, region);
} else {
kinesisClient.setRegion(Region.getRegion(Regions.fromName(region)));
}
DescribeStreamResult describeResult = null;
try {
describeResult = kinesisClient.describeStream(streamName);
String streamStatus = describeResult.getStreamDescription().getStreamStatus();
if (!StreamStatus.ACTIVE.name().equals(streamStatus) && !StreamStatus.UPDATING.name().equals(streamStatus)) {
initializationFailed = true;
error("Stream " + streamName + " is not ready (in active/updating status) for appender: " + name);
}
} catch (ResourceNotFoundException rnfe) {
initializationFailed = true;
error("Stream " + streamName + " doesn't exist for appender: " + name, rnfe);
}
asyncCallHander = new AsyncPutCallStatsReporter(name);
}