类java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy源码实例Demo

下面列出了怎么用java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: t-io   文件: Threads.java
/**
 * 
 * @return
 * @author tanyaowu
 */
public static SynThreadPoolExecutor getTioExecutor() {
	if (tioExecutor != null) {
		return tioExecutor;
	}

	synchronized (Threads.class) {
		if (tioExecutor != null) {
			return tioExecutor;
		}

		LinkedBlockingQueue<Runnable> runnableQueue = new LinkedBlockingQueue<>();
		//			ArrayBlockingQueue<Runnable> tioQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
		String threadName = "tio-worker";
		DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY);
		CallerRunsPolicy callerRunsPolicy = new TioCallerRunsPolicy();
		tioExecutor = new SynThreadPoolExecutor(MAX_POOL_SIZE_FOR_TIO, MAX_POOL_SIZE_FOR_TIO, KEEP_ALIVE_TIME, runnableQueue, defaultThreadFactory, threadName,
		        callerRunsPolicy);
		//			tioExecutor = new SynThreadPoolExecutor(AVAILABLE_PROCESSORS * 2, Integer.MAX_VALUE, 60, new SynchronousQueue<Runnable>(), defaultThreadFactory, tioThreadName);

		tioExecutor.prestartCoreThread();
		//			tioExecutor.prestartAllCoreThreads();
		return tioExecutor;
	}
}
 
/**
 * 初始化默认的线程池。在我们的默认线程池配置中,我们做了以下几件事,以保障吞吐量:
 * 1. 默认预发分配32个的核心线程,以保障数据较小时(Channel数较少时)的实时吞吐。
 * 2. 工作队列的大小适当调小,这样在用户数据量比较大(Channel数较多)时,可以更快的触发线程池新建线程的策略,及时的弹起更多的计算资源。
 * 3. 设置了默认的线程保活时间(默认60s),当数据量下去后,可以及时回收线程资源。
 *
 * @param threadPrefix: 线程名称的前缀标识
 * @return
 */
public static ThreadPoolExecutor newDefaultThreadPool(final String threadPrefix) {
    return new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_CORE_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(BLOCKING_QUEUE_SIZE),
        new ThreadFactory() {
            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                String threadName = threadPrefix + counter.getAndIncrement();
                LOG.info("TunnelWorkerConfig new thread: " + threadName);
                return new Thread(r, threadName);
            }
        },
        new CallerRunsPolicy());
}
 
源代码3 项目: riptide   文件: ThreadPoolExecutorsTest.java
@Test
void boundedQueueElasticSize() {
    final ThreadPoolExecutor executor = ThreadPoolExecutors.builder()
            .boundedQueue(1)
            .elasticSize(0, 1)
            .keepAlive(Duration.ofMinutes(1))
            .threadFactory(Executors.defaultThreadFactory())
            .handler(new CallerRunsPolicy())
            .build();

    assertThat(executor.getCorePoolSize(), is(0));
    assertThat(executor.getMaximumPoolSize(), is(1));
    assertThat(executor.getQueue(), is(instanceOf(ArrayBlockingQueue.class)));
    assertThat(executor.allowsCoreThreadTimeOut(), is(false));
}
 
源代码4 项目: hawkbit   文件: ExecutorAutoConfiguration.java
/**
 * @return the executor for UI background processes.
 */
@Bean(name = "uiExecutor")
@ConditionalOnMissingBean(name = "uiExecutor")
public Executor uiExecutor() {
    final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(20);
    final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS,
            blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build());
    threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return new DelegatingSecurityContextExecutor(threadPoolExecutor);
}
 
源代码5 项目: asteria-3.0   文件: ServiceQueue.java
/**
 * Creates and configures a new {@link ScheduledExecutorService} with a
 * timeout value of {@code seconds}. If the timeout value is below or equal
 * to zero then the returned executor will never timeout.
 *
 * @return the newly created and configured executor service.
 */
private static ScheduledExecutorService createServiceExecutor(long seconds) {
    Preconditions.checkArgument(seconds >= 0, "The timeout value must be equal to or greater than 0!");
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    executor.setRejectedExecutionHandler(new CallerRunsPolicy());
    executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("ServiceQueueThread").build());
    if (seconds > 0) {
        executor.setKeepAliveTime(seconds, TimeUnit.SECONDS);
        executor.allowCoreThreadTimeOut(true);
    }
    return executor;
}
 
@Bean
ListeningScheduledExecutorService cloudApiListeningScheduledExecutorService() {
    return MoreExecutors
            .listeningDecorator(new MDCCleanerScheduledExecutor(executorServicePoolSize,
                    new ThreadFactoryBuilder().setNameFormat("cloud-api-%d").build(),
                    new CallerRunsPolicy()));
}
 
源代码7 项目: tutorials   文件: SaturationPolicyUnitTest.java
@Ignore
@Test
public void givenCallerRunsPolicy_WhenSaturated_ThenTheCallerThreadRunsTheTask() {
    executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new CallerRunsPolicy());
    executor.execute(() -> waitFor(250));

    long startTime = System.currentTimeMillis();
    executor.execute(() -> waitFor(500));
    long blockedDuration = System.currentTimeMillis() - startTime;

    assertThat(blockedDuration).isGreaterThanOrEqualTo(500);
}
 
源代码8 项目: asteria-3.0   文件: GameSyncExecutor.java
/**
 * Creates and configures the update service for this game sync executor.
 * The returned executor is <b>unconfigurable</b> meaning it's configuration
 * can no longer be modified.
 * 
 * @param nThreads
 *            the amount of threads to create this service.
 * @return the newly created and configured service.
 */
private ExecutorService create(int nThreads) {
    if (nThreads <= 1)
        return null;
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
    executor.setRejectedExecutionHandler(new CallerRunsPolicy());
    executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("GameSyncThread").build());
    return Executors.unconfigurableExecutorService(executor);
}
 
 类所在包
 同包方法