下面列出了java.util.concurrent.ThreadPoolExecutor#setThreadFactory ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static public final ThreadPoolExecutor newFixedThreadPool(final int n_proc, final String namePrefix, final boolean use_caching_thread) {
final ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(n_proc);
exec.setThreadFactory(new ThreadFactory() {
final AtomicInteger ai = new AtomicInteger(0);
@Override
public Thread newThread(final Runnable r) {
final ThreadGroup tg = Thread.currentThread().getThreadGroup();
final Thread t;
final String name = new StringBuilder(null == namePrefix ? tg.getName() : namePrefix).append('-').append(ai.incrementAndGet()).toString();
if (use_caching_thread) {
t = new CachingThread(tg, r, name);
} else {
t = new Thread(tg, r, name);
}
t.setDaemon(true);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
});
return exec;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
int threadCount = conf.getInt(
YarnConfiguration.RM_AMLAUNCHER_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_AMLAUNCHER_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("ApplicationMasterLauncher #%d")
.build();
launcherPool = new ThreadPoolExecutor(threadCount, threadCount, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
launcherPool.setThreadFactory(tf);
Configuration newConf = new YarnConfiguration(conf);
newConf.setInt(CommonConfigurationKeysPublic.
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
conf.getInt(YarnConfiguration.RM_NODEMANAGER_CONNECT_RETIRES,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_CONNECT_RETIRES));
setConfig(newConf);
super.serviceInit(newConf);
}
public ThreadedExecutor(String name, int numThreads, long requestTimeout) {
_name = name;
_requestTimeout = requestTimeout;
// With the "always offer with a timeout" queue, the maximumPoolSize should always
// be set to the same as the corePoolSize, as otherwise things get very inefficient
// since each execute() call will will delay by <requestTimeout> even if we could add more
// threads. And since these two values are the same, the keepAliveTime value has
// no meaning.
BlockingQueue<Runnable> queue = new MyBlockingQueue<Runnable>();
_pool = new ThreadPoolExecutor(numThreads, numThreads, Long.MAX_VALUE,
TimeUnit.MILLISECONDS, queue);
// Give the threads used by this executor a consistent name.
final ThreadGroup group = new ThreadGroup(_name);
_pool.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(group, r, _name);
}
});
}
/**
* No synchronisation necessary as all added at start up.
* @param supervisionListener the listener that should be notified of supervision changes
* @param numberThreads the number of threads <b>this</b> listener should be notified on (max = core); core threads also time out
* @param queueSize the size of the queue to use for queuing supervision events (should be set according to
* number of DAQs/Equipments and the length of the expected tasks; runtime exception thrown if queue fills up!)
*/
@Override
public Lifecycle registerAsListener(final SupervisionListener supervisionListener, final int numberThreads, final int queueSize) {
listenerLock.writeLock().lock();
try {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(numberThreads, numberThreads,
DEFAULT_THREAD_TIMEOUT, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.setThreadFactory(r -> {
StringBuilder builder = new StringBuilder();
builder.append("Supervision-").append(executors.size()).append("-").append(threadPoolExecutor.getActiveCount());
return new Thread(r, builder.toString());
});
threadPoolExecutor.allowCoreThreadTimeOut(true);
executors.put(supervisionListener, threadPoolExecutor);
supervisionListeners.add(supervisionListener);
return new ExecutorLifecycleHandle(threadPoolExecutor);
} finally {
listenerLock.writeLock().unlock();
}
}
/**
* Returns the executor for this webclient.
* @return the executor
*/
public synchronized Executor getExecutor() {
if (executor_ == null) {
final ThreadPoolExecutor tmpThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
tmpThreadPool.setThreadFactory(new ThreadNamingFactory(tmpThreadPool.getThreadFactory()));
// tmpThreadPool.prestartAllCoreThreads();
executor_ = tmpThreadPool;
}
return executor_;
}
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
threadPool.setThreadFactory(new NamedThreadFactory("SEV-" + serverConfig.getProtocol().toUpperCase()
+ "-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
if (serverConfig.isPreStartCore()) { // 初始化核心线程池
threadPool.prestartAllCoreThreads();
}
return threadPool;
}
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
threadPool.setThreadFactory(new NamedThreadFactory(
"SEV-TRIPLE-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
if (serverConfig.isPreStartCore()) { // 初始化核心线程池
threadPool.prestartAllCoreThreads();
}
return threadPool;
}
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
threadPool.setThreadFactory(new NamedThreadFactory(
"SEV-BOLT-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
if (serverConfig.isPreStartCore()) { // 初始化核心线程池
threadPool.prestartAllCoreThreads();
}
return threadPool;
}
/**
* setThreadFactory sets the thread factory returned by getThreadFactory
*/
public void testSetThreadFactory() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
ThreadFactory threadFactory = new SimpleThreadFactory();
p.setThreadFactory(threadFactory);
assertSame(threadFactory, p.getThreadFactory());
}
}
/**
* setThreadFactory sets the thread factory returned by getThreadFactory
*/
public void testSetThreadFactory() {
final ThreadPoolExecutor p =
new CustomTPE(1, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
ThreadFactory threadFactory = new SimpleThreadFactory();
p.setThreadFactory(threadFactory);
assertSame(threadFactory, p.getThreadFactory());
}
}
public void start(int queueSize) {
// THREAD = 1
//
// The parallel processing is within the de-compressor itself - here we
// want
// to ensure a certain order of processing - if need more than one
// thread then
// have a look how the de-compressed data are sent to the GUI (!)
executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
executor.setThreadFactory(new DefaultThreadFactoryEx("DeCompressorEngine"));
// Rejection Policy
//
// Blocking pattern when queue full; that means we're not decompressing
// fast enough; when our queue is full
// then the network receiving thread is going to stop reading from the
// assisted side which in turn is going
// to slow down sending its capture leaving us some time to catch up.
//
// Having our queue full is quite unlikely; I would say the network will
// limit the number of capture/tiles
// being sent and I guess that decompressing is much faster then
// compressing (unless our PC is quite weak
// compared to the assisted one; let's not forget the JAVA capture is
// awful regarding the performance as
// well => should be fine here.
semaphore = new Semaphore(queueSize, true);
}
protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
int nThreads = conf.getInt(
YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DelegationTokenRenewer #%d")
.build();
ThreadPoolExecutor pool =
new ThreadPoolExecutor(nThreads, nThreads, 3L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
pool.setThreadFactory(tf);
pool.allowCoreThreadTimeOut(true);
return pool;
}
protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
int nThreads = conf.getInt(
YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DelegationTokenRenewer #%d")
.build();
ThreadPoolExecutor pool =
new ThreadPoolExecutor(nThreads, nThreads, 3L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
pool.setThreadFactory(tf);
pool.allowCoreThreadTimeOut(true);
return pool;
}
/**
* setThreadFactory sets the thread factory returned by getThreadFactory
*/
public void testSetThreadFactory() {
final ThreadPoolExecutor p =
new CustomTPE(1, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
ThreadFactory threadFactory = new SimpleThreadFactory();
p.setThreadFactory(threadFactory);
assertSame(threadFactory, p.getThreadFactory());
}
}
private ReadaheadPool() {
pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
pool.setThreadFactory(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Readahead Thread #%d")
.build());
}
/**
* setThreadFactory(null) throws NPE
*/
public void testSetThreadFactoryNull() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
try {
p.setThreadFactory(null);
shouldThrow();
} catch (NullPointerException success) {}
}
}
public void start(int queueSize) {
// THREAD = 1
//
// The parallel processing is within the compressor itself - here we
// want
// to ensure a certain order of processing - if need more than one
// thread
// then have a look how the compressed data are sent over the network
// (!)
// QUEUESIZE = 1
//
// Do we need more than one here ?
//
// - queue full because we could not compress the last capture when a
// new
// one is available => too many captures (!)
//
// - we could not send the last compressed capture over the network as
// the
// network queue is full => too many capture (!)
executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize));
executor.setThreadFactory(new DefaultThreadFactoryEx("CompressorEngine"));
executor.setRejectedExecutionHandler((runnable, poolExecutor) -> {
if (!poolExecutor.isShutdown()) {
final List<Runnable> pendings = new ArrayList<>();
// pendings : oldest first (!)
poolExecutor.getQueue().drainTo(pendings);
final MyExecutable newer = (MyExecutable) runnable;
if (!pendings.isEmpty()) {
final Capture[] cpendings = new Capture[pendings.size()];
int pos = 0;
for (int idx = pendings.size() - 1; idx > -1; idx--) {
cpendings[pos++] = ((MyExecutable) pendings.get(idx)).capture;
}
newer.capture.mergeDirtyTiles(cpendings);
}
poolExecutor.execute(newer);
}
});
}
@GwtIncompatible // TODO
private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
executor.setThreadFactory(new ThreadFactoryBuilder().setDaemon(true).setThreadFactory(executor.getThreadFactory()).build());
}
@GwtIncompatible // TODO
private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
executor.setThreadFactory(new ThreadFactoryBuilder().setDaemon(true).setThreadFactory(executor.getThreadFactory()).build());
}
/**
* Creates and configures the update service for this game sync executor.
* The returned executor is <b>unconfigurable</b> meaning it's configuration
* can no longer be modified.
*
* @param nThreads
* the amount of threads to create this service.
* @return the newly created and configured service.
*/
private ExecutorService create(int nThreads) {
if (nThreads <= 1)
return null;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("GameSyncThread").build());
return Executors.unconfigurableExecutorService(executor);
}