java.util.concurrent.ThreadPoolExecutor#setThreadFactory ( )源码实例Demo

下面列出了java.util.concurrent.ThreadPoolExecutor#setThreadFactory ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: TrakEM2   文件: Utils.java
static public final ThreadPoolExecutor newFixedThreadPool(final int n_proc, final String namePrefix, final boolean use_caching_thread) {
	final ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(n_proc);
	exec.setThreadFactory(new ThreadFactory() {
		final AtomicInteger ai = new AtomicInteger(0);
		@Override
		public Thread newThread(final Runnable r) {
			final ThreadGroup tg = Thread.currentThread().getThreadGroup();
			final Thread t;
			final String name = new StringBuilder(null == namePrefix ? tg.getName() : namePrefix).append('-').append(ai.incrementAndGet()).toString();
			if (use_caching_thread) {
				t = new CachingThread(tg, r, name);
			} else {
				t = new Thread(tg, r, name);
			}
			t.setDaemon(true);
			t.setPriority(Thread.NORM_PRIORITY);
			return t;
		}
	});
	return exec;
}
 
源代码2 项目: hadoop   文件: ApplicationMasterLauncher.java
@Override
protected void serviceInit(Configuration conf) throws Exception {
  int threadCount = conf.getInt(
      YarnConfiguration.RM_AMLAUNCHER_THREAD_COUNT,
      YarnConfiguration.DEFAULT_RM_AMLAUNCHER_THREAD_COUNT);
  ThreadFactory tf = new ThreadFactoryBuilder()
      .setNameFormat("ApplicationMasterLauncher #%d")
      .build();
  launcherPool = new ThreadPoolExecutor(threadCount, threadCount, 1,
      TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
  launcherPool.setThreadFactory(tf);

  Configuration newConf = new YarnConfiguration(conf);
  newConf.setInt(CommonConfigurationKeysPublic.
          IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
      conf.getInt(YarnConfiguration.RM_NODEMANAGER_CONNECT_RETIRES,
          YarnConfiguration.DEFAULT_RM_NODEMANAGER_CONNECT_RETIRES));
  setConfig(newConf);
  super.serviceInit(newConf);
}
 
源代码3 项目: flink-crawler   文件: ThreadedExecutor.java
public ThreadedExecutor(String name, int numThreads, long requestTimeout) {
    _name = name;
    _requestTimeout = requestTimeout;

    // With the "always offer with a timeout" queue, the maximumPoolSize should always
    // be set to the same as the corePoolSize, as otherwise things get very inefficient
    // since each execute() call will will delay by <requestTimeout> even if we could add more
    // threads. And since these two values are the same, the keepAliveTime value has
    // no meaning.

    BlockingQueue<Runnable> queue = new MyBlockingQueue<Runnable>();
    _pool = new ThreadPoolExecutor(numThreads, numThreads, Long.MAX_VALUE,
            TimeUnit.MILLISECONDS, queue);

    // Give the threads used by this executor a consistent name.
    final ThreadGroup group = new ThreadGroup(_name);
    _pool.setThreadFactory(new ThreadFactory() {

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(group, r, _name);
        }
    });
}
 
源代码4 项目: c2mon   文件: SupervisionNotifierImpl.java
/**
 * No synchronisation necessary as all added at start up.
 * @param supervisionListener the listener that should be notified of supervision changes
 * @param numberThreads the number of threads <b>this</b> listener should be notified on (max = core); core threads also time out
 * @param queueSize the size of the queue to use for queuing supervision events (should be set according to
 *  number of DAQs/Equipments and the length of the expected tasks; runtime exception thrown if queue fills up!)
 */
@Override
public Lifecycle registerAsListener(final SupervisionListener supervisionListener, final int numberThreads, final int queueSize) {
  listenerLock.writeLock().lock();
  try {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(numberThreads, numberThreads,
            DEFAULT_THREAD_TIMEOUT, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
            new ThreadPoolExecutor.AbortPolicy());
    threadPoolExecutor.setThreadFactory(r -> {
      StringBuilder builder = new StringBuilder();
      builder.append("Supervision-").append(executors.size()).append("-").append(threadPoolExecutor.getActiveCount());
      return new Thread(r, builder.toString());
    });
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    executors.put(supervisionListener, threadPoolExecutor);
    supervisionListeners.add(supervisionListener);
    return new ExecutorLifecycleHandle(threadPoolExecutor);
  } finally {
    listenerLock.writeLock().unlock();
  }
}
 
源代码5 项目: htmlunit   文件: WebClient.java
/**
 * Returns the executor for this webclient.
 * @return the executor
 */
public synchronized Executor getExecutor() {
    if (executor_ == null) {
        final ThreadPoolExecutor tmpThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
        tmpThreadPool.setThreadFactory(new ThreadNamingFactory(tmpThreadPool.getThreadFactory()));
        // tmpThreadPool.prestartAllCoreThreads();
        executor_ = tmpThreadPool;
    }

    return executor_;
}
 
源代码6 项目: sofa-rpc   文件: AbstractHttpServer.java
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
    ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
    threadPool.setThreadFactory(new NamedThreadFactory("SEV-" + serverConfig.getProtocol().toUpperCase()
        + "-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
    threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
    if (serverConfig.isPreStartCore()) { // 初始化核心线程池
        threadPool.prestartAllCoreThreads();
    }
    return threadPool;
}
 
源代码7 项目: sofa-rpc   文件: TripleServer.java
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
    ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
    threadPool.setThreadFactory(new NamedThreadFactory(
        "SEV-TRIPLE-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
    threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
    if (serverConfig.isPreStartCore()) { // 初始化核心线程池
        threadPool.prestartAllCoreThreads();
    }
    return threadPool;
}
 
源代码8 项目: sofa-rpc   文件: BoltServer.java
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
    ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
    threadPool.setThreadFactory(new NamedThreadFactory(
        "SEV-BOLT-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
    threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
    if (serverConfig.isPreStartCore()) { // 初始化核心线程池
        threadPool.prestartAllCoreThreads();
    }
    return threadPool;
}
 
源代码9 项目: openjdk-jdk9   文件: ThreadPoolExecutorTest.java
/**
 * setThreadFactory sets the thread factory returned by getThreadFactory
 */
public void testSetThreadFactory() {
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(1, 2,
                               LONG_DELAY_MS, MILLISECONDS,
                               new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p)) {
        ThreadFactory threadFactory = new SimpleThreadFactory();
        p.setThreadFactory(threadFactory);
        assertSame(threadFactory, p.getThreadFactory());
    }
}
 
/**
 * setThreadFactory sets the thread factory returned by getThreadFactory
 */
public void testSetThreadFactory() {
    final ThreadPoolExecutor p =
        new CustomTPE(1, 2,
                      LONG_DELAY_MS, MILLISECONDS,
                      new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p)) {
        ThreadFactory threadFactory = new SimpleThreadFactory();
        p.setThreadFactory(threadFactory);
        assertSame(threadFactory, p.getThreadFactory());
    }
}
 
源代码11 项目: Dayon   文件: DeCompressorEngine.java
public void start(int queueSize) {
	// THREAD = 1
	//
	// The parallel processing is within the de-compressor itself - here we
	// want
	// to ensure a certain order of processing - if need more than one
	// thread then
	// have a look how the de-compressed data are sent to the GUI (!)

	executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

	executor.setThreadFactory(new DefaultThreadFactoryEx("DeCompressorEngine"));

	// Rejection Policy
	//
	// Blocking pattern when queue full; that means we're not decompressing
	// fast enough; when our queue is full
	// then the network receiving thread is going to stop reading from the
	// assisted side which in turn is going
	// to slow down sending its capture leaving us some time to catch up.
	//
	// Having our queue full is quite unlikely; I would say the network will
	// limit the number of capture/tiles
	// being sent and I guess that decompressing is much faster then
	// compressing (unless our PC is quite weak
	// compared to the assisted one; let's not forget the JAVA capture is
	// awful regarding the performance as
	// well => should be fine here.

	semaphore = new Semaphore(queueSize, true);
}
 
源代码12 项目: hadoop   文件: DelegationTokenRenewer.java
protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
  int nThreads = conf.getInt(
      YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
      YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);

  ThreadFactory tf = new ThreadFactoryBuilder()
      .setNameFormat("DelegationTokenRenewer #%d")
      .build();
  ThreadPoolExecutor pool =
      new ThreadPoolExecutor(nThreads, nThreads, 3L,
          TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
  pool.setThreadFactory(tf);
  pool.allowCoreThreadTimeOut(true);
  return pool;
}
 
源代码13 项目: big-c   文件: DelegationTokenRenewer.java
protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
  int nThreads = conf.getInt(
      YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
      YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);

  ThreadFactory tf = new ThreadFactoryBuilder()
      .setNameFormat("DelegationTokenRenewer #%d")
      .build();
  ThreadPoolExecutor pool =
      new ThreadPoolExecutor(nThreads, nThreads, 3L,
          TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
  pool.setThreadFactory(tf);
  pool.allowCoreThreadTimeOut(true);
  return pool;
}
 
源代码14 项目: j2objc   文件: ThreadPoolExecutorSubclassTest.java
/**
 * setThreadFactory sets the thread factory returned by getThreadFactory
 */
public void testSetThreadFactory() {
    final ThreadPoolExecutor p =
        new CustomTPE(1, 2,
                      LONG_DELAY_MS, MILLISECONDS,
                      new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p)) {
        ThreadFactory threadFactory = new SimpleThreadFactory();
        p.setThreadFactory(threadFactory);
        assertSame(threadFactory, p.getThreadFactory());
    }
}
 
源代码15 项目: big-c   文件: ReadaheadPool.java
private ReadaheadPool() {
  pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS,
      new ArrayBlockingQueue<Runnable>(CAPACITY));
  pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
  pool.setThreadFactory(new ThreadFactoryBuilder()
    .setDaemon(true)
    .setNameFormat("Readahead Thread #%d")
    .build());
}
 
源代码16 项目: j2objc   文件: ThreadPoolExecutorTest.java
/**
 * setThreadFactory(null) throws NPE
 */
public void testSetThreadFactoryNull() {
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(1, 2,
                               LONG_DELAY_MS, MILLISECONDS,
                               new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p)) {
        try {
            p.setThreadFactory(null);
            shouldThrow();
        } catch (NullPointerException success) {}
    }
}
 
源代码17 项目: Dayon   文件: CompressorEngine.java
public void start(int queueSize) {
	// THREAD = 1
	//
	// The parallel processing is within the compressor itself - here we
	// want
	// to ensure a certain order of processing - if need more than one
	// thread
	// then have a look how the compressed data are sent over the network
	// (!)

	// QUEUESIZE = 1
	//
	// Do we need more than one here ?
	//
	// - queue full because we could not compress the last capture when a
	// new
	// one is available => too many captures (!)
	//
	// - we could not send the last compressed capture over the network as
	// the
	// network queue is full => too many capture (!)

	executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize));

	executor.setThreadFactory(new DefaultThreadFactoryEx("CompressorEngine"));

	executor.setRejectedExecutionHandler((runnable, poolExecutor) -> {
           if (!poolExecutor.isShutdown()) {
               final List<Runnable> pendings = new ArrayList<>();

               // pendings : oldest first (!)
               poolExecutor.getQueue().drainTo(pendings);

               final MyExecutable newer = (MyExecutable) runnable;

               if (!pendings.isEmpty()) {
                   final Capture[] cpendings = new Capture[pendings.size()];

                   int pos = 0;

                   for (int idx = pendings.size() - 1; idx > -1; idx--) {
                       cpendings[pos++] = ((MyExecutable) pendings.get(idx)).capture;
                   }

                   newer.capture.mergeDirtyTiles(cpendings);
               }

               poolExecutor.execute(newer);
           }
       });
}
 
源代码18 项目: codebuff   文件: MoreExecutors.java
@GwtIncompatible // TODO
private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
  executor.setThreadFactory(new ThreadFactoryBuilder().setDaemon(true).setThreadFactory(executor.getThreadFactory()).build());
}
 
源代码19 项目: codebuff   文件: MoreExecutors.java
@GwtIncompatible // TODO
private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
  executor.setThreadFactory(new ThreadFactoryBuilder().setDaemon(true).setThreadFactory(executor.getThreadFactory()).build());
}
 
源代码20 项目: asteria-3.0   文件: GameSyncExecutor.java
/**
 * Creates and configures the update service for this game sync executor.
 * The returned executor is <b>unconfigurable</b> meaning it's configuration
 * can no longer be modified.
 * 
 * @param nThreads
 *            the amount of threads to create this service.
 * @return the newly created and configured service.
 */
private ExecutorService create(int nThreads) {
    if (nThreads <= 1)
        return null;
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
    executor.setRejectedExecutionHandler(new CallerRunsPolicy());
    executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("GameSyncThread").build());
    return Executors.unconfigurableExecutorService(executor);
}