下面列出了怎么用com.google.common.util.concurrent.ThreadFactoryBuilder的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void init() {
String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, eventHubNamespace);
LOG.info("Initializing SamzaEventHubClientManager for namespace: " + eventHubNamespace);
try {
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder()
.setNamespaceName(eventHubNamespace)
.setEventHubName(entityPath)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey);
ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder().setNameFormat("Samza EventHubClient Thread-%d").setDaemon(true);
eventHubClientExecutor = Executors.newFixedThreadPool(numClientThreads, threadFactoryBuilder.build());
eventHubClient = EventHubClient.createSync(connectionStringBuilder.toString(), retryPolicy, eventHubClientExecutor);
} catch (IOException | EventHubException e) {
String msg = String.format("Creation of EventHub client failed for eventHub EntityPath: %s on remote host %s:%d",
entityPath, remoteHost, ClientConstants.AMQPS_PORT);
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
LOG.info("SamzaEventHubClientManager initialized for namespace: " + eventHubNamespace);
}
private static List<ThreadPoolExecutor> createChunkExecutors(
ConfigurationSource conf) {
// TODO create single pool with N threads if using non-incremental chunks
final int threadCountPerDisk = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
final int numberOfDisks =
MutableVolumeSet.getDatanodeStorageDirs(conf).size();
ThreadPoolExecutor[] executors =
new ThreadPoolExecutor[threadCountPerDisk * numberOfDisks];
for (int i = 0; i < executors.length; i++) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ChunkWriter-" + i + "-%d")
.build();
BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();
executors[i] = new ThreadPoolExecutor(1, 1,
0, TimeUnit.SECONDS, workQueue, threadFactory);
}
return ImmutableList.copyOf(executors);
}
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
LocalDirsHandlerService dirsHandler, Context context) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
this.dispatcher = dispatcher;
this.delService = delService;
this.dirsHandler = dirsHandler;
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
this.stateStore = context.getNMStateStore();
this.nmContext = context;
}
public TableCacheImpl(CacheCleanupPolicy cleanupPolicy) {
// As for full table cache only we need elements to be inserted in sorted
// manner, so that list will be easy. For other we can go with Hash map.
if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
cache = new ConcurrentSkipListMap<>();
} else {
cache = new ConcurrentHashMap<>();
}
epochEntries = new ConcurrentSkipListSet<>();
// Created a singleThreadExecutor, so one cleanup will be running at a
// time.
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("PartialTableCache Cleanup Thread - %d").build();
executorService = Executors.newSingleThreadExecutor(build);
this.cleanupPolicy = cleanupPolicy;
}
public ThreadSharingExecutor(Config config, TWSChannel ch, ExecutionPlan plan,
IExecutionHook hook) {
this.config = config;
this.channel = ch;
this.numThreads = ExecutorContext.threadsPerContainer(config);
Thread.UncaughtExceptionHandler hndler = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
throw new RuntimeException(ex);
}
};
this.threads = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setNameFormat("executor-%d")
.setDaemon(true)
.setUncaughtExceptionHandler(hndler)
.build());
this.executionPlan = plan;
this.executionHook = hook;
}
public ClientBuilder() {
enableGZip = true;
name = "hosebird-client-" + clientNum.getAndIncrement();
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("hosebird-client-io-thread-%d")
.build();
executorService = Executors.newSingleThreadExecutor(threadFactory);
ThreadFactory rateTrackerThreadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("hosebird-client-rateTracker-thread-%d")
.build();
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1, rateTrackerThreadFactory);
rateTracker = new BasicRateTracker(30000, 100, true, scheduledExecutor);
reconnectionManager = new BasicReconnectionManager(5);
socketTimeoutMillis = 60000;
connectionTimeoutMillis = 4000;
schemeRegistry = SchemeRegistryFactory.createDefault();
}
public void run() throws IOException {
Preconditions.checkState(inputManager != null, "InputManager must be configured");
if (maxTimeToWaitForReportMillis > 0) {
reporterExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}")
.build());
Future reporterFuture = reporterExecutor.submit(new ReporterCallable());
}
ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback(), GuavaShim.directExecutor());
// Shutdown this executor once this task, and the callback complete.
schedulerExecutor.shutdown();
}
/**
* Constructor.
*
* @param monitorIntervalInSecs monitor interval in seconds.
* @param singerConfig the SingerConfig.
*/
protected DefaultLogMonitor(int monitorIntervalInSecs,
SingerConfig singerConfig)
throws ConfigurationException {
Preconditions.checkArgument(monitorIntervalInSecs > 0);
this.monitorIntervalInSecs = monitorIntervalInSecs;
this.processedLogStreams = Maps.newHashMap();
this.isStopped = true;
this.scheduledFuture = null;
this.logMonitorExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("LogMonitor").build());
if (singerConfig.isSetSingerRestartConfig() && singerConfig.singerRestartConfig.restartDaily) {
dailyRestart = true;
setDailyRestartTime(singerConfig.singerRestartConfig);
}
}
@SuppressWarnings("unchecked")
DbMessageStore(MessageDao messageDao, int bufferSize, int maxDbBatchSize) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("DisruptorMessageStoreThread-%d").build();
disruptor = new Disruptor<>(DbOperation.getFactory(),
bufferSize, namedThreadFactory, ProducerType.MULTI, new
SleepingBlockingWaitStrategy());
disruptor.setDefaultExceptionHandler(new DbStoreExceptionHandler());
disruptor.handleEventsWith(new DbEventMatcher(bufferSize))
.then(new DbAccessHandler(messageDao, maxDbBatchSize))
.then(new FinalEventHandler());
disruptor.start();
this.messageDao = messageDao;
}
public OriginHealthStatusMonitor create(Id id, HealthCheckConfig healthCheckConfig, Supplier<OriginHealthCheckFunction> healthCheckFunction, HttpClient client) {
if (healthCheckConfig == null || !healthCheckConfig.isEnabled()) {
return new NoOriginHealthStatusMonitor();
}
ScheduledExecutorService executorService = newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat(format("STYX-ORIGINS-MONITOR-%s", requireNonNull(id)))
.setDaemon(true)
.build());
ScheduledOriginHealthStatusMonitor healthStatusMonitor = new ScheduledOriginHealthStatusMonitor(
executorService,
healthCheckFunction.get(),
new Schedule(healthCheckConfig.intervalMillis(), MILLISECONDS),
client);
return new AnomalyExcludingOriginHealthStatusMonitor(healthStatusMonitor, healthCheckConfig.healthyThreshold(), healthCheckConfig.unhealthyThreshold());
}
private void initializeInputPreloadExecutorService() {
if( vcfInitializerThreads > 1) {
if( intervals.size() == 1) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("readerInitializer-thread-%d")
.setDaemon(true)
.build();
this.inputPreloadExecutorService = Executors.newFixedThreadPool(vcfInitializerThreads, threadFactory);
}
else {
logger.warn("GenomicsDBImport cannot use multiple VCF reader threads for initialization when the "
+ "number of intervals is greater than 1. Falling back to serial VCF reader initialization.");
inputPreloadExecutorService = null;
}
} else {
inputPreloadExecutorService = null;
}
}
@Override
public void start() {
LOGGER.info("Configuration provider starting");
Preconditions.checkState(file != null,
"The parameter file must not be null");
executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
.build());
FileWatcherRunnable fileWatcherRunnable =
new FileWatcherRunnable(file, counterGroup);
executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
TimeUnit.SECONDS);
lifecycleState = LifecycleState.START;
LOGGER.debug("Configuration provider started");
}
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
if (storageType.isTransient()) {
return null;
}
if (dataset.datanode == null) {
// FsVolumeImpl is used in test.
return null;
}
final int maxNumThreads = dataset.datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);
ThreadFactory workerFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
.build();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, maxNumThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
workerFactory);
executor.allowCoreThreadTimeOut(true);
return executor;
}
/**
* The in-memory store bootstraps itself from the shared cache entries that
* exist in HDFS.
*/
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.startTime = System.currentTimeMillis();
this.initialDelayMin = getInitialDelay(conf);
this.checkPeriodMin = getCheckPeriod(conf);
this.stalenessMinutes = getStalenessPeriod(conf);
bootstrap(conf);
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
.build();
scheduler = Executors.newSingleThreadScheduledExecutor(tf);
super.serviceInit(conf);
}
public AbstractFileMetricsReporter(
MetricRegistry registry,
SingularityS3Configuration configuration,
ObjectMapper metricsObjectMapper
) {
this.registry = registry;
this.configuration = configuration;
this.metricsObjectMapper = metricsObjectMapper;
this.fileReporterExecutor =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("metrics-file-reporter").build()
);
if (configuration.getMetricsFilePath().isPresent()) {
startFileReporter();
}
}
public ThreadPoolQueuedSink(
int jobQueueSize,
int corePoolSize,
int maxPoolSize,
long jobTimeout,
String threadFactoryName) {
jobQueue = new ArrayBlockingQueue<Runnable>(jobQueueSize == 0 ? 100 : jobQueueSize) {
@Override
public boolean offer(Runnable runnable) {
try {
put(runnable); // not to reject the task, slowing down
} catch (InterruptedException e) {
// do nothing
}
return true;
}
};
senders = new ThreadPoolExecutor(
corePoolSize == 0 ? 3 : corePoolSize,
maxPoolSize == 0 ? 10 : maxPoolSize,
10, TimeUnit.SECONDS,
jobQueue,
new ThreadFactoryBuilder().setNameFormat(threadFactoryName + "-Sender-%d").build());
this.jobTimeout = jobTimeout;
}
public InMemoryMessageBus(
String name,
Serializer<T> serializer,
Deserializer<T> deserializer
) {
this(
name,
serializer,
deserializer,
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(name + "-dispatcher")
.build()
)
);
}
@Bean
ListeningScheduledExecutorService cloudApiListeningScheduledExecutorService() {
return MoreExecutors
.listeningDecorator(new MDCCleanerScheduledExecutor(executorServicePoolSize,
new ThreadFactoryBuilder().setNameFormat("cloud-api-%d").build(),
new CallerRunsPolicy()));
}
@Override
protected void configUserThread() {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(this.getClass().getSimpleName())
.setDaemon(true)
.build();
UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory));
}
public FsDatasetCache(FsDatasetImpl dataset) {
this.dataset = dataset;
this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
ThreadFactory workerFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("FsDatasetCache-%d-" + dataset.toString())
.build();
this.usedBytesCount = new UsedBytesCount();
this.uncachingExecutor = new ThreadPoolExecutor(
0, 1,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
workerFactory);
this.uncachingExecutor.allowCoreThreadTimeOut(true);
this.deferredUncachingExecutor = new ScheduledThreadPoolExecutor(
1, workerFactory);
this.revocationMs = dataset.datanode.getConf().getLong(
DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT);
long confRevocationPollingMs = dataset.datanode.getConf().getLong(
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS,
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT);
long minRevocationPollingMs = revocationMs / 2;
if (minRevocationPollingMs < confRevocationPollingMs) {
throw new RuntimeException("configured value " +
confRevocationPollingMs + "for " +
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS +
" is too high. It must not be more than half of the " +
"value of " + DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS +
". Reconfigure this to " + minRevocationPollingMs);
}
this.revocationPollingMs = confRevocationPollingMs;
}
/**
* Returns a thread factory that produces threads with MAX_PRIORITY.
*
* @param factory backing ThreadFactory
* @return thread factory
*/
public static ThreadFactory maxPriority(ThreadFactory factory) {
return new ThreadFactoryBuilder()
.setThreadFactory(factory)
.setPriority(Thread.MAX_PRIORITY)
.build();
}
public ExecutorService getThreadPoolForTableScan() {
ExecutorService res = tableScanThreadPool;
if (res == null) {
synchronized (this) {
if (tableScanThreadPool == null) {
tableScanThreadPool = Executors.newFixedThreadPool(
conf.getTableScanConcurrency(),
new ThreadFactoryBuilder().setDaemon(true).build());
}
res = tableScanThreadPool;
}
}
return res;
}
private boolean executeChromosomeTask(final List<ChromosomeGeneTask> chrTasks, TaskType taskType)
{
chrTasks.forEach(x -> x.setTaskType(taskType));
if(mConfig.Threads <= 1)
{
chrTasks.forEach(x -> x.call());
return true;
}
final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Isofox-%d").build();
ExecutorService executorService = Executors.newFixedThreadPool(mConfig.Threads, namedThreadFactory);
List<FutureTask> threadTaskList = new ArrayList<FutureTask>();
for(ChromosomeGeneTask chrGeneTask : chrTasks)
{
FutureTask futureTask = new FutureTask(chrGeneTask);
threadTaskList.add(futureTask);
executorService.execute(futureTask);
}
if(!checkThreadCompletion(threadTaskList))
{
mIsValid = false;
return false;
}
executorService.shutdown();
return true;
}
private boolean executeTasks(final List<FusionCohortTask> fusionTasks)
{
if(mConfig.Threads <= 1)
{
fusionTasks.forEach(x -> x.call());
return true;
}
final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Isofox-%d").build();
ExecutorService executorService = Executors.newFixedThreadPool(mConfig.Threads, namedThreadFactory);
List<FutureTask> threadTaskList = new ArrayList<FutureTask>();
for(FusionCohortTask fusionTask : fusionTasks)
{
FutureTask futureTask = new FutureTask(fusionTask);
threadTaskList.add(futureTask);
executorService.execute(futureTask);
}
if(!checkThreadCompletion(threadTaskList))
return false;
executorService.shutdown();
return true;
}
public InMemoryPlatformMessageBus() {
super(
"platform-bus",
JSON.createSerializer(PlatformMessage.class),
JSON.createDeserializer(PlatformMessage.class),
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("platform-bus-dispatcher")
.build()
)
);
}
@Override
public void start() {
logger.info("Starting {}...", this);
sinkCounter.start();
super.start();
pathController.setBaseDirectory(directory);
if(rollInterval > 0){
rollService = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat(
"rollingFileSink-roller-" +
Thread.currentThread().getId() + "-%d").build());
/*
* Every N seconds, mark that it's time to rotate. We purposefully do NOT
* touch anything other than the indicator flag to avoid error handling
* issues (e.g. IO exceptions occuring in two different threads.
* Resist the urge to actually perform rotation in a separate thread!
*/
rollService.scheduleAtFixedRate(new Runnable() {
public void run() {
logger.debug("Marking time to rotate file {}",
pathController.getCurrentFile());
shouldRotate = true;
}
}, rollInterval, rollInterval, TimeUnit.SECONDS);
} else{
logger.info("RollInterval is not valid, file rolling will not happen.");
}
logger.info("RollingFileSink {} started.", getName());
}
public synchronized void start() {
log.info(this, cName + " LogAgent starting with directory: " + filePaths);
String OS = null;
if (System.getProperty("os.name").contains(OS_WINDOWS))
OS = OS_WINDOWS;
try {
// build log read engine
reader = new ReliableTaildirEventReader.Builder().filePaths(filePaths).headerTable(headerTable)
.positionFilePath(positionFilePath).skipToEnd(skipToEnd).addByteOffset(byteOffsetHeader)
.OperSystem(OS).build();
// registion
}
catch (IOException e) {
log.err(this, "Error instantiating ReliableTaildirEventReader", e);
return;
}
// build maintain status of file (if idle many times, close file.)
idleFileChecker = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(), idleTimeout, checkIdleInterval,
TimeUnit.MILLISECONDS);
// build maintain file of position
positionWriter = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(), writePosInitDelay, writePosInterval,
TimeUnit.MILLISECONDS);
log.debug(this, "LogAgent started");
}
/**
* Returns a thread factory that produces threads named according to the
* supplied name pattern.
*
* @param pattern name pattern
* @return thread factory
*/
public static ThreadFactory namedThreads(String pattern, Logger log) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern)
.setThreadFactory(new AtomixThreadFactory())
.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception on " + t.getName(), e))
.build();
}
public StreamingAllSharingExecutor2(Config cfg, int workerId, TWSChannel channel,
ExecutionPlan executionPlan, IExecutionHook hook) {
this.workerId = workerId;
this.config = cfg;
this.channel = channel;
this.numThreads = ExecutorContext.threadsPerContainer(config);
if (numThreads > 1) {
this.threads = Executors.newFixedThreadPool(numThreads - 1,
new ThreadFactoryBuilder().setNameFormat("executor-%d").setDaemon(true).build());
}
this.plan = executionPlan;
this.executionHook = hook;
}
@BeforeClass
public static void startZookeeper() throws Exception {
_testingServer = new TestingServer();
_rootCurator = CuratorFrameworkFactory.builder()
.connectString(_testingServer.getConnectString())
.retryPolicy(new RetryNTimes(3, 100))
.threadFactory(new ThreadFactoryBuilder().setNameFormat("test-%d").setDaemon(true).build())
.build();
_rootCurator.start();
}