下面列出了怎么用org.apache.commons.lang3.concurrent.BasicThreadFactory的API类实例代码及写法,或者点击链接到github查看源代码。
protected ExecutorWrapperImpl(
int numThreads,
final String name,
final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
final BasicThreadFactory.Builder factory =
new BasicThreadFactory.Builder().namingPattern(name + "-%d");
if (uncaughtExceptionHandler != null) {
factory.uncaughtExceptionHandler(uncaughtExceptionHandler);
} else {
factory.uncaughtExceptionHandler(
(thread, error) -> {
LOG.error("Thread " + thread.toString() + " failed to complete properly", error);
});
}
executorService = Executors.newScheduledThreadPool(numThreads, factory.build());
}
@Inject
private Executor(@Parameter(JobConf.ExecutorId.class) final String executorId,
final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
final MessageEnvironment messageEnvironment,
final SerializerManager serializerManager,
final IntermediateDataIOFactory intermediateDataIOFactory,
final BroadcastManagerWorker broadcastManagerWorker,
final MetricManagerWorker metricMessageSender) {
this.executorId = executorId;
this.executorService = Executors.newCachedThreadPool(new BasicThreadFactory.Builder()
.namingPattern("TaskExecutor thread-%d")
.build());
this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
this.serializerManager = serializerManager;
this.intermediateDataIOFactory = intermediateDataIOFactory;
this.broadcastManagerWorker = broadcastManagerWorker;
this.metricMessageSender = metricMessageSender;
messageEnvironment.setupListener(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID, new ExecutorMessageReceiver());
}
/**
* Close
*/
public void close() {
log.info("Closing TwitchClient ...");
// Modules
if (this.chat != null) {
this.chat.close();
}
if (this.pubsub != null) {
this.pubsub.close();
}
if (this.twitchClientHelper != null) {
twitchClientHelper.close();
}
// Shutdown ThreadPools created by Twitch4J
if (scheduledThreadPoolExecutor.getThreadFactory() instanceof BasicThreadFactory) {
BasicThreadFactory threadFactory = (BasicThreadFactory) scheduledThreadPoolExecutor.getThreadFactory();
if (threadFactory.getNamingPattern().equalsIgnoreCase("twitch4j-%d")) {
scheduledThreadPoolExecutor.shutdownNow();
}
}
}
@PostConstruct
public void init() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("SendNodeServerInfo-schedule-pool-%d").daemon(true).build());
scheduledExecutorService.scheduleAtFixedRate(() ->
{
//将负载加载到ZK中
if (!CollectionUtils.isEmpty(dataCenterChannelStore.getAllChannels())) {
dataCenterChannelStore.getAllChannels().stream().forEach(e -> {
log.info("channel id:{}, {}", e.id(), e);
});
}
applicationEventPublisher.publishEvent(
NodeServerInfoEvent.builder()
.name(goPushNodeServerConfig.getName())
.nodeServerInfo(watch())
.build());
// 写入zk 其实不需要发送 NodeInfoReq
nodeSender.send(NodeInfoReq.builder().build());
}
, delay, delay, TimeUnit.MILLISECONDS);
}
TaskPollExecutor(EurekaClient eurekaClient, TaskClient taskClient, int threadCount, int updateRetryCount,
String workerNamePrefix) {
this.eurekaClient = eurekaClient;
this.taskClient = taskClient;
this.updateRetryCount = updateRetryCount;
LOGGER.info("Initialized the TaskPollExecutor with {} threads", threadCount);
AtomicInteger count = new AtomicInteger(0);
this.executorService = Executors.newFixedThreadPool(threadCount,
new BasicThreadFactory.Builder()
.namingPattern(workerNamePrefix + count.getAndIncrement())
.uncaughtExceptionHandler(uncaughtExceptionHandler)
.build());
this.pollingSemaphore = new PollingSemaphore(threadCount);
}
@Inject
public SlaveService(ApolloConfiguration apolloConfiguration, SlaveDao slaveDao, EnvironmentDao environmentDao) {
this.slaveDao = requireNonNull(slaveDao);
this.environmentDao = requireNonNull(environmentDao);
this.apolloConfiguration = requireNonNull(apolloConfiguration);
isSlave = apolloConfiguration.getSlave().isSlave();
environmentIds = parseEnvironmentIds();
if (isSlave && isEmpty(environmentIds)) {
logger.error("Slave must be bundled with a valid list of environments! Bailing..");
throw new RuntimeException("Could not understand slaves params");
}
slaveId = apolloConfiguration.getSlave().getSlaveId();
keepaliveExecutorService = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder()
.namingPattern("slave-keepalive-pinger")
.build());
}
protected void initAsyncJobExecutionThreadPool() {
if (threadPoolQueue == null) {
LOGGER.info("Creating thread pool queue of size {}", queueSize);
threadPoolQueue = new ArrayBlockingQueue<>(queueSize);
}
if (executorService == null) {
LOGGER.info("Creating executor service with corePoolSize {}, maxPoolSize {} and keepAliveTime {}", corePoolSize, maxPoolSize, keepAliveTime);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(threadPoolNamingPattern).build();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime,
TimeUnit.MILLISECONDS, threadPoolQueue, threadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
executorService = threadPoolExecutor;
}
}
/**
* 初始化应用数据
*/
private static void initDbData(final MyTvData data) {
final TvService tvService = new TvServiceImpl();
makeCache(tvService);
// 启动抓取任务
ExecutorService executorService = Executors
.newSingleThreadExecutor(new BasicThreadFactory.Builder()
.namingPattern("Mytv_Crawl_Task_%d").build());
executorService.execute(new Runnable() {
@Override
public void run() {
runCrawlTask(data, tvService);
}
});
executorService.shutdown();
// 启动每天定时任务
logger.info("create everyday crawl task.");
createEverydayCron(data, tvService);
}
public GremlinExecutor create() {
final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-executor-default-%d").build();
final AtomicBoolean poolCreatedByBuilder = new AtomicBoolean();
final AtomicBoolean suppliedExecutor = new AtomicBoolean(true);
final AtomicBoolean suppliedScheduledExecutor = new AtomicBoolean(true);
final ExecutorService es = Optional.ofNullable(executorService).orElseGet(() -> {
poolCreatedByBuilder.set(true);
suppliedExecutor.set(false);
return Executors.newScheduledThreadPool(4, threadFactory);
});
executorService = es;
final ScheduledExecutorService ses = Optional.ofNullable(scheduledExecutorService).orElseGet(() -> {
// if the pool is created by the builder and we need another just re-use it, otherwise create
// a new one of those guys
suppliedScheduledExecutor.set(false);
return (poolCreatedByBuilder.get()) ?
(ScheduledExecutorService) es : Executors.newScheduledThreadPool(4, threadFactory);
});
scheduledExecutorService = ses;
return new GremlinExecutor(this, suppliedExecutor.get(), suppliedScheduledExecutor.get());
}
private void testConcurrency(BiConsumer<ExecutorService, Integer> taskCreator) throws InterruptedException {
ExecutorService s = Executors.newFixedThreadPool(NUMBER_OF_THREADS,
new BasicThreadFactory.Builder().daemon(true).uncaughtExceptionHandler((t, e) -> log.error(e.getMessage(), e)).build());
this.remainingDeleteSubmissions = new CountDownLatch(NUMBER_OF_TASKS);
for (int i = 0; i < NUMBER_OF_TASKS; i++) {
taskCreator.accept(s, i);
}
try {
assertTrue("Did not create all components in time", this.remainingDeleteSubmissions.await(100, TimeUnit.SECONDS));
s.shutdown();
assertTrue("Did not finish before timeout", s.awaitTermination(100, TimeUnit.SECONDS));
} finally {
s.shutdownNow();
}
}
/**
* 执行周期性或定时任务
*/
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService()
{
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build())
{
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
Threads.printException(r, t);
}
};
}
private ExecutorService createExecutorService(int threads)
{
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern(batch + "-thread-%d")
.build();
return Executors.newFixedThreadPool(threads, threadFactory);
}
private void verifyExecutorService(ExecutorService service)
{
ThreadPoolExecutor executorService = (ThreadPoolExecutor) service;
assertEquals(THREADS, executorService.getCorePoolSize());
assertTrue(executorService.isTerminated());
assertTrue(executorService.isShutdown());
assertThat(executorService.getThreadFactory(), instanceOf(BasicThreadFactory.class));
}
public static ExecutorService createPool(int poolSize, String name) {
logger.info("create threadPool:{name:{},poolSize:{}", name, poolSize);
return new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new BasicThreadFactory.Builder().daemon(true)
.namingPattern(name + "-%d").build());
}
/**
* 执行周期性或定时任务
*/
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService()
{
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build())
{
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
Threads.printException(r, t);
}
};
}
/**
* 执行周期性或定时任务
*/
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService()
{
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build());
}
/**
* 执行周期性或定时任务
*/
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService() {
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
log.error(ExceptionUtil.getMessage(t));
}
};
}
public static ScheduledExecutorService newScheduledThreadPool(int size,
String name) {
ThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern(name)
.build();
return Executors.newScheduledThreadPool(size, factory);
}
private static ThreadFactory buildThreadFactory()
{
return new BasicThreadFactory.Builder()
.daemon(true)
.priority(Thread.MIN_PRIORITY)
.build();
}
public static ExecutorService createExecutorService(int corePoolSize,String threadNamingPattern, int workQueue){
if (corePoolSize < 1){
corePoolSize = 5;
}
if (workQueue < 1){
workQueue = 50;
}
return new ThreadPoolExecutor(corePoolSize, corePoolSize * 10, 100L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<Runnable>(workQueue),
new BasicThreadFactory.Builder().namingPattern(threadNamingPattern).daemon(true).build(), new ThreadPoolExecutor.AbortPolicy());
}
public static ExecutorService createCacheExecutorService(int corePoolSize, int maximumPoolSize, String threadNamingPattern) {
if(corePoolSize < 1) {
corePoolSize = 5;
}
if(maximumPoolSize < 1) {
maximumPoolSize = 100;
}
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
new BasicThreadFactory.Builder().namingPattern(threadNamingPattern).daemon(true).build(), new ThreadPoolExecutor.AbortPolicy());
}
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService() {
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
ThreadUtils.printException(r, t);
}
};
}
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduleTaskExuctor() {
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build()){
@Override
protected void afterExecute(Runnable r, Throwable t){
super.afterExecute(r, t);
Threads.printException(r, t);
}
};
}
/**
* 执行周期性或定时任务
*/
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService()
{
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build());
}
@PostConstruct
public void initHandlerStoreThread() {
if (handlerStore != null) {
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2, (new BasicThreadFactory.Builder()).build());
poolExecutor.scheduleAtFixedRate(new EventHandleStoreConsumer(), 0, 10, TimeUnit.SECONDS);
}
}
@PostConstruct
public void init() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("SendDataCenterInfo-schedule-pool-%d").daemon(true).build());
scheduledExecutorService.scheduleAtFixedRate(() -> applicationEventPublisher.publishEvent(DataCenterInfoEvent.builder()
.name(goPushDataCenterConfig.getName())
.dataCenterInfo(watch())
.build()), delay, delay, TimeUnit.MILLISECONDS);
}
protected ExecutorService createExecutor(int i, String s) {
return Executors.newFixedThreadPool(i,
new BasicThreadFactory.Builder()
.namingPattern(s)
.uncaughtExceptionHandler((thread, throwable) ->
errorLogger.log(L.WARN, throwable, ErrorContext.builder().build())
).build());
}
public SQLDB(
Supplier<UUID> serverUUIDSupplier,
Locale locale,
PlanConfig config,
RunnableFactory runnableFactory,
PluginLogger logger,
ErrorLogger errorLogger
) {
this.serverUUIDSupplier = serverUUIDSupplier;
this.locale = locale;
this.config = config;
this.runnableFactory = runnableFactory;
this.logger = logger;
this.errorLogger = errorLogger;
devMode = config.isTrue(PluginSettings.DEV_MODE);
this.transactionExecutorServiceProvider = () -> {
String nameFormat = "Plan " + getClass().getSimpleName() + "-transaction-thread-%d";
return Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder()
.namingPattern(nameFormat)
.uncaughtExceptionHandler((thread, throwable) -> {
if (devMode) {
errorLogger.log(L.WARN, throwable, ErrorContext.builder()
.whatToDo("THIS ERROR IS ONLY LOGGED IN DEV MODE")
.build());
}
}).build());
};
}
public static <T> ExecutorService createProcessors(int threadCount, BlockingQueue<T> queue, Consumer<T> consumer, String name) {
ThreadFactory factory = new BasicThreadFactory.Builder().namingPattern(name + "-%d").build();
ExecutorService result = Executors.newFixedThreadPool(threadCount, factory);
for (int i = 0; i < threadCount; i++) {
result.submit(new Processor(queue, consumer, name));
}
return result;
}
/**
* 给所有在线用户发送消息
*
* @param message
* @throws IOException
*/
public void broadcast(final TextMessage message) throws IOException {
// 多线程群发
for (Set<WebSocketSession> item : userSocketSessionMap.values()) {
for (final WebSocketSession session : item) {
if (session.isOpen()) {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("socket-schedule-pool-%d").daemon(true)
.build());
for (int i = 0; i < 3; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
if (session.isOpen()) {
logger.debug("broadcast output:" + message.toString());
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
}
}
}