下面列出了怎么用java.util.concurrent.ThreadFactory的API类实例代码及写法,或者点击链接到github查看源代码。
private RuntimeUtil() {
scheduler = new ScheduledThreadPoolExecutor(
schedulerThreads,
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable runnable) {
try {
return AccessController.doPrivileged(
new NewThreadAction(runnable,
"Scheduler(" + count.getAndIncrement() + ")",
true));
} catch (Throwable t) {
runtimeLog.log(Level.WARNING,
"scheduler thread factory throws", t);
return null;
}
}
});
/*
* We would like to allow the scheduler's threads to terminate
* if possible, but a bug in DelayQueue.poll can cause code
* like this to result in a busy loop:
*/
// stpe.setKeepAliveTime(10, TimeUnit.MINUTES);
// stpe.allowCoreThreadTimeOut(true);
}
/**
* /// xhao: init variable
*/
private void initVariable() {
tracker = GoogleAnalytics.getInstance(this).newTracker(R.xml.tracker);
settings = PreferenceManager.getDefaultSharedPreferences(this);
editor = settings.edit();
profileManager = new ProfileManager(new DBHelper(this));
ssrsubManager = new SSRSubManager(new DBHelper(this));
resources = getResources();
mThreadPool = new ScheduledThreadPoolExecutor(10, new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread(r);
thread.setName("co.tgbot.peekfun-thread");
return thread;
}
});
}
public BackgroundService(String serviceName, long interval,
TimeUnit unit, int threadPoolSize, long serviceTimeout) {
this.interval = interval;
this.unit = unit;
this.serviceName = serviceName;
this.serviceTimeout = serviceTimeout;
threadGroup = new ThreadGroup(serviceName);
ThreadFactory tf = r -> new Thread(threadGroup, r);
threadFactory = new ThreadFactoryBuilder()
.setThreadFactory(tf)
.setDaemon(true)
.setNameFormat(serviceName + "#%d")
.build();
exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
service = new PeriodicalTask();
}
public TableCacheImpl(CacheCleanupPolicy cleanupPolicy) {
// As for full table cache only we need elements to be inserted in sorted
// manner, so that list will be easy. For other we can go with Hash map.
if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
cache = new ConcurrentSkipListMap<>();
} else {
cache = new ConcurrentHashMap<>();
}
epochEntries = new ConcurrentSkipListSet<>();
// Created a singleThreadExecutor, so one cleanup will be running at a
// time.
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("PartialTableCache Cleanup Thread - %d").build();
executorService = Executors.newSingleThreadExecutor(build);
this.cleanupPolicy = cleanupPolicy;
}
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
* with a common prefix.
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
private final int poolNum = poolNumber.getAndIncrement();
final ThreadGroup group = threadGroup;
@Override
public Thread newThread(Runnable r) {
final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
return new Thread(group, r, name);
}
};
}
@Override
public void download(String apkUrl, String apkName, OnDownloadListener listener) {
this.apkUrl = apkUrl;
this.apkName = apkName;
this.listener = listener;
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread(r);
thread.setName(Constant.THREAD_NAME);
return thread;
}
});
executor.execute(runnable);
}
public ThrottleDataCleanUpTask() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Throttle Cleanup Task");
return t;
}
});
int throttleFrequency = ThrottleConfigHolder.getInstance().getThrottleFrequency();
if (log.isDebugEnabled()) {
log.debug("Throttling Cleanup Task Frequency set to " + throttleFrequency);
}
executor.scheduleAtFixedRate(new CleanupTask(), throttleFrequency, throttleFrequency, TimeUnit.SECONDS);
}
@Test
@SuppressWarnings("serial")
public void testShutdownNowIsPropagatedToTheExecutorOnDestroy() throws Exception {
final ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
ScheduledExecutorFactoryBean factory = new ScheduledExecutorFactoryBean() {
@Override
protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return executor;
}
};
factory.setScheduledExecutorTasks(new ScheduledExecutorTask[]{
new NoOpScheduledExecutorTask()
});
factory.afterPropertiesSet();
factory.destroy();
verify(executor).shutdownNow();
}
public static ThreadFactory newGenericThreadFactory(String processName)
{
Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler()
{
@Override
public void uncaughtException(Thread t, Throwable e)
{
log.error("Unexpected exception in thread: " + t, e);
Throwables.propagate(e);
}
};
return new ThreadFactoryBuilder()
.setNameFormat(processName + "-%d")
.setDaemon(true)
.setUncaughtExceptionHandler(uncaughtExceptionHandler)
.build();
}
public void test() throws Exception {
TestEventFactory factory = new TestEventFactory();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
Disruptor<TestEvent> disruptor = new Disruptor<>(factory, 1024, threadFactory);
disruptor.handleEventsWith(new TestEventHandler());
disruptor.start();
RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
TestEventProducerWithTranslator producer = new TestEventProducerWithTranslator(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
private RuntimeUtil() {
scheduler = new ScheduledThreadPoolExecutor(
schedulerThreads,
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable runnable) {
try {
return AccessController.doPrivileged(
new NewThreadAction(runnable,
"Scheduler(" + count.getAndIncrement() + ")",
true));
} catch (Throwable t) {
runtimeLog.log(Level.WARNING,
"scheduler thread factory throws", t);
return null;
}
}
});
/*
* We would like to allow the scheduler's threads to terminate
* if possible, but a bug in DelayQueue.poll can cause code
* like this to result in a busy loop:
*/
// stpe.setKeepAliveTime(10, TimeUnit.MINUTES);
// stpe.allowCoreThreadTimeOut(true);
}
protected AppOpticsMeterRegistry(AppOpticsConfig config, Clock clock, ThreadFactory threadFactory, HttpSender httpClient) {
super(config, clock);
config().namingConvention(new AppOpticsNamingConvention());
this.config = config;
this.httpClient = httpClient;
config().meterFilter(new MeterFilter() {
@Override
public Meter.Id map(Meter.Id id) {
if (id.getName().startsWith("system.")) {
return id.withName("micrometer." + id.getName());
}
return id;
}
});
start(threadFactory);
}
/**
* /// xhao: init variable
*/
private void initVariable() {
tracker = GoogleAnalytics.getInstance(this).newTracker(R.xml.tracker);
settings = PreferenceManager.getDefaultSharedPreferences(this);
editor = settings.edit();
profileManager = new ProfileManager(new DBHelper(this));
ssrsubManager = new SSRSubManager(new DBHelper(this));
resources = getResources();
mThreadPool = new ScheduledThreadPoolExecutor(10, new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread(r);
thread.setName("shadowsocks-thread");
return thread;
}
});
}
NewRelicMeterRegistry(NewRelicConfig config, @Nullable NewRelicClientProvider clientProvider,
NamingConvention namingConvention, Clock clock, ThreadFactory threadFactory) {
super(config, clock);
if (clientProvider == null) {
//default to Insight API client provider if not specified in config or provided
clientProvider = (config.clientProviderType() == ClientProviderType.INSIGHTS_AGENT)
? new NewRelicInsightsAgentClientProvider(config)
: new NewRelicInsightsApiClientProvider(config);
}
this.clientProvider = clientProvider;
thisConfig = new Config() {
@Override
public Config namingConvention(NamingConvention convention) {
NewRelicMeterRegistry.this.clientProvider.setNamingConvention(convention);
return super.namingConvention(convention);
}
};
config().namingConvention(namingConvention);
start(threadFactory);
}
@VisibleForTesting
MetricReporter(
MetricWriter metricWriter,
long writeInterval,
ThreadFactory threadFactory,
MetricRegistry metricRegistry,
BlockingQueue<Optional<ImmutableList<MetricPoint<?>>>> writeQueue) {
checkArgument(writeInterval > 0, "writeInterval must be greater than zero");
this.metricWriter = metricWriter;
this.writeInterval = writeInterval;
this.threadFactory = threadFactory;
this.metricRegistry = metricRegistry;
this.writeQueue = writeQueue;
this.metricExporter = new MetricExporter(writeQueue, metricWriter, threadFactory);
}
/**
* Creates the AST serializer executor service used for in-memory serialization of split functions' ASTs.
* It is created with an unbounded queue (so it can queue any number of pending tasks). Its core and max
* threads is the same, but they are all allowed to time out so when there's no work, they can all go
* away. The threads will be daemons, and they will time out if idle for a minute. Their priority is also
* slightly lower than normal priority as we'd prefer the CPU to keep running the program; serializing
* split function is a memory conservation measure (it allows us to release the AST), it can wait a bit.
* @return an executor service with above described characteristics.
*/
private static ExecutorService createAstSerializerExecutorService() {
final int threads = Math.max(1, Options.getIntProperty("nashorn.serialize.threads", Runtime.getRuntime().availableProcessors() / 2));
final ThreadPoolExecutor service = new ThreadPoolExecutor(threads, threads, 1L, TimeUnit.MINUTES, new LinkedBlockingDeque<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread t = new Thread(r, "Nashorn AST Serializer");
t.setDaemon(true);
t.setPriority(Thread.NORM_PRIORITY - 1);
return t;
}
});
service.allowCoreThreadTimeOut(true);
return service;
}
/**
* Create a AsyncDiskServices with a set of volumes (specified by their
* root directories).
*
* The AsyncDiskServices uses one ThreadPool per volume to do the async
* disk operations.
*
* @param volumes The roots of the file system volumes.
*/
public AsyncDiskService(String[] volumes) {
threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r);
}
};
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
executors.put(volumes[v], executor);
}
}
@Test
@SuppressWarnings("serial")
public void testShutdownIsPropagatedToTheExecutorOnDestroy() throws Exception {
final ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
ScheduledExecutorFactoryBean factory = new ScheduledExecutorFactoryBean() {
@Override
protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return executor;
}
};
factory.setScheduledExecutorTasks(new ScheduledExecutorTask[]{
new NoOpScheduledExecutorTask()
});
factory.setWaitForTasksToCompleteOnShutdown(true);
factory.afterPropertiesSet();
factory.destroy();
verify(executor).shutdown();
}
@Default
default EventExecutorGroup executor() {
Logger logger = LoggerFactory.getLogger("imap-executor");
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setUncaughtExceptionHandler((t, e) -> logger.error("Uncaught exception on thread {}", t.getName(), e))
.setNameFormat("imap-executor-%d")
.build();
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
return new DefaultEventExecutorGroup(nThreads, threadFactory);
}
private void initScheduler() {
restoreFromCooldownScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RestoreFromCooldownScheduler");
}
});
restoreFromCooldownScheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
restoreFromCooldown(null);
}
}, 2, 2, TimeUnit.SECONDS);
}
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
ScheduledExecutorService executor =
createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
if (this.removeOnCancelPolicy) {
if (executor instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true);
}
else {
logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor");
}
}
// Register specified ScheduledExecutorTasks, if necessary.
if (!ObjectUtils.isEmpty(this.scheduledExecutorTasks)) {
registerTasks(this.scheduledExecutorTasks, executor);
}
// Wrap executor with an unconfigurable decorator.
this.exposedExecutor = (this.exposeUnconfigurableExecutor ?
Executors.unconfigurableScheduledExecutorService(executor) : executor);
return executor;
}
public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy) {
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueSize);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, SECONDS, queue, threadFactory, policy);
executor.allowCoreThreadTimeOut(true);
return executor;
}
public static void shutdown(ScheduledExecutorService executor, long timeoutMillis) {
if (executor == null) {
return;
}
executor.shutdownNow();
boolean cleanlyTerminated;
try {
cleanlyTerminated = executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
cleanlyTerminated = executor.isTerminated();
}
if (!cleanlyTerminated) {
String threadpoolName;
if (executor instanceof ScheduledThreadPoolExecutor) {
ThreadFactory factory = ((ScheduledThreadPoolExecutor) executor).getThreadFactory();
if (factory instanceof NamingThreadFactory) {
NamingThreadFactory namingFactory = (NamingThreadFactory) factory;
threadpoolName = namingFactory.getPrefix();
} else {
threadpoolName = "unknown[" + factory.getClass().getSimpleName() + "]";
}
} else {
threadpoolName = "unknown[" + executor.getClass().getSimpleName() + "]";
}
LOG.error("executor did not terminate in the specified time: {}", threadpoolName);
}
}
@Override
public EventLoopGroup create() {
// Use Netty's DefaultThreadFactory in order to get the benefit of FastThreadLocal.
boolean useDaemonThreads = true;
ThreadFactory threadFactory = new DefaultThreadFactory(name, useDaemonThreads);
int parallelism = numEventLoops == 0
? Runtime.getRuntime().availableProcessors() * 2 : numEventLoops;
return new NioEventLoopGroup(parallelism, threadFactory);
}
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(ReportServlet.class.getSimpleName() + "-%d")
.setDaemon(true)
.build();
this.executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), threadFactory);
}
/**
* 默认全局线程工厂
*
* @return ThreadFactory
*/
public synchronized ThreadFactory defaultThreadFactory() {
if (threadFactory == null) {
threadFactory = new ThreadFactoryBuilder().setDaemon(true).setPriority(Thread.NORM_PRIORITY)
.setNameFormat("def-pool-%d").build();
}
return threadFactory;
}
/**
* getThreadFactory returns factory in constructor if not set
*/
public void testGetThreadFactory() {
final ThreadFactory threadFactory = new SimpleThreadFactory();
final CustomExecutor p = new CustomExecutor(1, threadFactory);
try (PoolCleaner cleaner = cleaner(p)) {
assertSame(threadFactory, p.getThreadFactory());
}
}
/**
* Creates a new single threaded executor service for performing the
* callbacks.
*/
private static ExecutorService createNewExecutorService() {
return Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("java-sdk-progress-listener-callback-thread");
t.setDaemon(true);
return t;
}
});
}
public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit,
int queueCapacity, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(coreThreads, maxThreads, keepAliveTime, unit, new ExecutorQueue(), threadFactory, handler);
((ExecutorQueue) getQueue()).setStandardThreadExecutor(this);
submittedTasksCount = new AtomicInteger(0);
// 最大并发任务限制: 队列buffer数 + 最大线程数
maxSubmittedTaskCount = queueCapacity + maxThreads;
}
@Test
public void customFactoryIsUsed() throws Exception {
ThreadFactory threadFactory = spy(new CustomThreadFactory());
SdkAsyncHttpClient customClient =
NettyNioAsyncHttpClient.builder()
.eventLoopGroupBuilder(SdkEventLoopGroup.builder()
.threadFactory(threadFactory))
.build();
makeSimpleRequest(customClient);
customClient.close();
Mockito.verify(threadFactory, atLeastOnce()).newThread(Mockito.any());
}