java.util.concurrent.ThreadPoolExecutor#CallerRunsPolicy ( )源码实例Demo

下面列出了java.util.concurrent.ThreadPoolExecutor#CallerRunsPolicy ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: tls-channel   文件: AsynchronousTlsChannelGroup.java
/**
 * Creates an instance of this class.
 *
 * @param nThreads number of threads in the executor used to assist the selector loop and run
 *     completion handlers.
 */
public AsynchronousTlsChannelGroup(int nThreads) {
  try {
    selector = Selector.open();
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
  timeoutExecutor.setRemoveOnCancelPolicy(true);
  this.executor =
      new ThreadPoolExecutor(
          nThreads,
          nThreads,
          0,
          TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<>(nThreads * queueLengthMultiplier),
          runnable ->
              new Thread(runnable, String.format("async-channel-group-%d-handler-executor", id)),
          new ThreadPoolExecutor.CallerRunsPolicy());
  selectorThread.start();
}
 
源代码2 项目: Mycat2   文件: DataMigrator.java
private void clearData(Map<File,DataNode> map,TableMigrateInfo table){
	if(table.isError()) {
		return;
	}
	ExecutorService executor  =  new ThreadPoolExecutor(margs.getThreadCount(), margs.getThreadCount(),
               0L, TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());
	Iterator<Entry<File,DataNode>>  it = map.entrySet().iterator();
	while(it.hasNext()){
		Entry<File,DataNode> et = it.next();
		File f =et.getKey();
		DataNode srcDn  =  et.getValue();
		executor.execute(new DataClearRunner(table, srcDn, f));
	}
	executor.shutdown();
	while(true){
		if(executor.isTerminated()){
			break;
		}
		try {
			Thread.sleep(200);
		} catch (InterruptedException e) {
			LOGGER.error("error",e);
		}
	}
}
 
源代码3 项目: pmq   文件: ConsumerHeartbeatController.java
@PostConstruct
private void init() {
	heartBeatThreadSize = soaConfig.getHeartBeatThreadSize();
	executor = new ThreadPoolExecutor(1, 1, 3L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
			SoaThreadFactory.create("heartbeat",Thread.MAX_PRIORITY, true), new ThreadPoolExecutor.CallerRunsPolicy());
	executorRun = new ThreadPoolExecutor(heartBeatThreadSize, heartBeatThreadSize, 10L, TimeUnit.SECONDS,
			new ArrayBlockingQueue<>(5000), SoaThreadFactory.create("heartbeat-run",Thread.MAX_PRIORITY-1, true),
			new ThreadPoolExecutor.CallerRunsPolicy());
	soaConfig.registerChanged(new Runnable() {
		@Override
		public void run() {
			if (heartBeatThreadSize != soaConfig.getHeartBeatThreadSize()) {
				heartBeatThreadSize = soaConfig.getHeartBeatThreadSize();
				executorRun.setCorePoolSize(heartBeatThreadSize);
				executorRun.setMaximumPoolSize(heartBeatThreadSize);
			}

		}
	});
	executor.execute(() -> {
		heartbeat();
	});
}
 
源代码4 项目: activemq-artemis   文件: MqttClientService.java
@PostConstruct
public void init() throws MqttException {
   final String serverURI = "tcp://localhost:1883";
   final MqttConnectOptions options = new MqttConnectOptions();
   options.setAutomaticReconnect(true);
   options.setCleanSession(false);
   options.setMaxInflight(1000);
   options.setServerURIs(new String[] {serverURI});
   options.setMqttVersion(MQTT_VERSION_3_1_1);

   final ThreadFactory threadFactory = new DefaultThreadFactory("mqtt-client-exec");
   executorService = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
   mqttClient = new MqttClient(serverURI, clientId, persistence, executorService);
   mqttClient.setTimeToWait(-1);
   mqttClient.connect(options);
   mqttClient.setCallback(this);
   log.debugf("[MQTT][Connected][client: %s]", clientId);
}
 
源代码5 项目: examples   文件: BatchWriter.java
public BatchWriter(EmbeddedSolrServer solr, int batchSize, TaskID tid,
    int writerThreads, int queueSize) {
  this.solr = solr;
  this.writerThreads = writerThreads;
  this.queueSize = queueSize;
  taskId = tid;

  // we need to obtain the settings before the constructor
  if (writerThreads != 0) {
    batchPool = new ThreadPoolExecutor(writerThreads, writerThreads, 5,
        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize),
        new ThreadPoolExecutor.CallerRunsPolicy());
  } else { // single threaded case
    batchPool = null;
  }
}
 
static ThreadPoolExecutor getPoolFromNs(final Configuration ns) {
    final int maxQueueSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_QUEUE_MAX_LENGTH);
    final ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("getDelegate-%d").build();
    //begin adaptation of constructor at
    //https://github.com/buka/titan/blob/master/src/main/java/com/thinkaurelius/titan/diskstorage/dynamodb/DynamoDBClient.java#L104
    final int maxPoolSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE);
    final int corePoolSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE);
    final long keepAlive = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_KEEP_ALIVE);
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive,
        TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(maxQueueSize), factory, new ThreadPoolExecutor.CallerRunsPolicy());
    //end adaptation of constructor at
    //https://github.com/buka/titan/blob/master/src/main/java/com/thinkaurelius/titan/diskstorage/dynamodb/DynamoDBClient.java#L104
    executor.allowCoreThreadTimeOut(false);
    executor.prestartAllCoreThreads();
    return executor;
}
 
源代码7 项目: multi-task   文件: CustomizedParallelFetchTest.java
/**
 * 通过客户端线程池并行查询测试
 */
@Test
public void testParallelFetchByClientExecutorPool() {

    ExecutorService threadPool =
            new ThreadPoolExecutor(2, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    QueryParam qp = new QueryParam();
    new TaskPair("deviceStatFetcher", DeviceRequest.build(qp));

    MultiResult ctx =
            parallelExePool.submit(
                    threadPool,
                    new TaskPair("deviceStatFetcher", DeviceRequest.build(qp)),
                    new TaskPair("deviceUvFetcher", DeviceRequest.build(qp)));

    List<DeviceViewItem> stat = ctx.getResult("deviceStatFetcher");
    List<DeviceViewItem> uv = ctx.getResult("deviceUvFetcher");

    Assert.notEmpty(stat);
    Assert.notEmpty(uv);
    System.out.println(stat);
    System.out.println(uv);
}
 
private DynamicallySizedThreadPoolExecutor createInstance(int corePoolSize, int maximumPoolSize, int keepAliveTime)
{
    // We need a thread factory
    TraceableThreadFactory threadFactory = new TraceableThreadFactory();
    threadFactory.setThreadDaemon(true);
    threadFactory.setThreadPriority(Thread.NORM_PRIORITY);

    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();

    return new DynamicallySizedThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            keepAliveTime,
            TimeUnit.SECONDS,
            workQueue,
            threadFactory,
            new ThreadPoolExecutor.CallerRunsPolicy());
}
 
源代码9 项目: EVCache   文件: EVCacheClientPoolManager.java
@Inject
public EVCacheClientPoolManager(IConnectionBuilder connectionFactoryprovider, EVCacheNodeList evcacheNodeList, EVCacheConfig evcConfig) {
    instance = this;

    this.connectionFactoryProvider = connectionFactoryprovider;
    this.evcacheNodeList = evcacheNodeList;
    this.evcConfig = evcConfig;
    this.evcacheEventListenerList = new CopyOnWriteArrayList<EVCacheEventListener>();

    String clientCurrentInstanceId = null;
    if(clientCurrentInstanceId == null) clientCurrentInstanceId= System.getenv("EC2_INSTANCE_ID");
    if(clientCurrentInstanceId == null) clientCurrentInstanceId= System.getenv("NETFLIX_INSTANCE_ID");
    if(log.isInfoEnabled()) log.info("\nClient Current InstanceId from env = " + clientCurrentInstanceId);
    if(clientCurrentInstanceId == null && EVCacheConfig.getInstance().getPropertyRepository() != null) clientCurrentInstanceId = EVCacheConfig.getInstance().getPropertyRepository().get("EC2_INSTANCE_ID", String.class).orElse(null).get();
    if(clientCurrentInstanceId == null && EVCacheConfig.getInstance().getPropertyRepository() != null) clientCurrentInstanceId = EVCacheConfig.getInstance().getPropertyRepository().get("NETFLIX_INSTANCE_ID", String.class).orElse(null).get();

    if(clientCurrentInstanceId != null && !clientCurrentInstanceId.equalsIgnoreCase("localhost")) {
        this.defaultReadTimeout = EVCacheConfig.getInstance().getPropertyRepository().get("default.read.timeout", Integer.class).orElse(20);
        if(log.isInfoEnabled()) log.info("\nClient Current InstanceId = " + clientCurrentInstanceId + " which is probably a cloud location. The default.read.timeout = " + defaultReadTimeout);
    } else { //Assuming this is not in cloud so bump up the timeouts
        this.defaultReadTimeout = EVCacheConfig.getInstance().getPropertyRepository().get("default.read.timeout", Integer.class).orElse(750);
        if(log.isInfoEnabled()) log.info("\n\nClient Current InstanceId = " + clientCurrentInstanceId + ". Probably a non-cloud instance. The default.read.timeout = " + defaultReadTimeout + "\n\n");
    }
    this.logEnabledApps = EVCacheConfig.getInstance().getPropertyRepository().get("EVCacheClientPoolManager.log.apps", String.class).orElse("*");
    this.defaultRefreshInterval = EVCacheConfig.getInstance().getPropertyRepository().get("EVCacheClientPoolManager.refresh.interval", Integer.class).orElse(60);

    this.asyncExecutor = new EVCacheScheduledExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(), 30, TimeUnit.SECONDS, new ThreadPoolExecutor.CallerRunsPolicy(), "scheduled");
    asyncExecutor.prestartAllCoreThreads();
    this.syncExecutor = new EVCacheExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(), 30, TimeUnit.SECONDS, new ThreadPoolExecutor.CallerRunsPolicy(), "pool");
    syncExecutor.prestartAllCoreThreads();

    initAtStartup();
}
 
源代码10 项目: LuckyFrameClient   文件: BatchTestCaseExecution.java
/**
 * �����̳߳أ����߳�ִ������
 * @param taskid ����ID
 * @param batchcase ���������ַ�����#����
 * @throws Exception ���쳣
 */

public static void batchCaseExecuteForTast(String taskid, String batchcase) throws Exception{
	int threadcount = GetServerApi.cGetTaskSchedulingByTaskId(Integer.parseInt(taskid)).getExThreadCount();
	ThreadPoolExecutor	threadExecute	= new ThreadPoolExecutor(threadcount, 30, 3, TimeUnit.SECONDS,
			new ArrayBlockingQueue<>(1000),
            new ThreadPoolExecutor.CallerRunsPolicy());
	//ִ��ȫ���dzɹ�״̬����
	if(batchcase.contains("ALLFAIL")){
		//��ʼ��д��������Լ���־ģ�� 
		serverOperation caselog = new serverOperation();        
		List<Integer> caseIdList = caselog.getCaseListForUnSucByTaskId(taskid);
		for (Integer integer : caseIdList) {
			ProjectCase testcase = GetServerApi.cGetCaseByCaseId(integer);
			TestControl.THREAD_COUNT++;   //���̼߳���++�����ڼ���߳��Ƿ�ȫ��ִ����
			threadExecute.execute(new ThreadForBatchCase(testcase.getCaseId(), taskid));
		}			
	}else{                                           //����ִ������
		String[] temp=batchcase.split("#");
		LogUtil.APP.info("��ǰ����ִ�������й��С�{}��������������...",temp.length);
		for (String s : temp) {
			TestControl.THREAD_COUNT++;   //���̼߳���++�����ڼ���߳��Ƿ�ȫ��ִ����
			threadExecute.execute(new ThreadForBatchCase(Integer.valueOf(s), taskid));
		}
	}
	//���̼߳��������ڼ���߳��Ƿ�ȫ��ִ����
	int i=0;
	while(TestControl.THREAD_COUNT!=0){
		i++;
		if(i>600){
			break;
		}
		Thread.sleep(6000);
	}
	threadExecute.shutdown();
}
 
源代码11 项目: j2objc   文件: ThreadPoolExecutorTest.java
/**
 * execute using CallerRunsPolicy drops task on shutdown
 */
public void testCallerRunsOnShutdown() {
    RejectedExecutionHandler h = new ThreadPoolExecutor.CallerRunsPolicy();
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(1, 1,
                               LONG_DELAY_MS, MILLISECONDS,
                               new ArrayBlockingQueue<Runnable>(1), h);

    try { p.shutdown(); } catch (SecurityException ok) { return; }
    try (PoolCleaner cleaner = cleaner(p)) {
        TrackedNoOpRunnable r = new TrackedNoOpRunnable();
        p.execute(r);
        assertFalse(r.done);
    }
}
 
源代码12 项目: connector-sdk   文件: FullTraversalConnector.java
/**
 * Creates all objects needed for a traversal.
 *
 * @param context the context used to get the configuration
 * @throws Exception if configuration parameters are invalid
 */
@Override
public void init(IndexingConnectorContext context) throws Exception {
  checkState(Configuration.isInitialized(), "configuration not initialized");
  indexingService = checkNotNull(context.getIndexingService());
  defaultAcl = DefaultAcl.fromConfiguration(indexingService);
  repositoryContext =
      new RepositoryContext.Builder()
          .setEventBus(new EventBus("EventBus-" + getClass().getName()))
          .setDefaultAclMode(defaultAcl.getDefaultAclMode())
          .build();
  if (checkpointHandler == null) {
    checkpointHandler = LocalFileCheckpointHandler.fromConfiguration();
  }

  useQueues = Configuration.getBoolean(TRAVERSE_USE_QUEUES, DEFAULT_USE_QUEUES).get();
  QueueCheckpoint.Builder builder = new QueueCheckpoint.Builder(useQueues);
  if (useQueues) {
    String tag =
        Configuration.getString(TRAVERSE_QUEUE_TAG, repository.getClass().getSimpleName()).get();
    builder.setCheckpointHandler(checkpointHandler)
        .setQueueA(QUEUE_NAME + "A: " + tag)
        .setQueueB(QUEUE_NAME + "B: " + tag);
  }
  queueCheckpoint = builder.build();

  numThreads = Configuration.getInteger(NUM_THREADS, DEFAULT_THREAD_NUM).get();
  // TODO(bmj): Fix this gross violation of encapsulation.
  numToAbort = Configuration
      .getValue(TraverseExceptionHandlerFactory.TRAVERSE_EXCEPTION_HANDLER, 0L, value -> {
        if (IGNORE_FAILURE.equals(value)) {
          return Long.MAX_VALUE;
        } else {
          try {
            return Long.parseLong(value);
          } catch (NumberFormatException e) {
            throw new InvalidConfigurationException(
                "Unrecognized value for traversal exception handler: " + value, e);
          }
        }
      }).get();

  threadPoolExecutor =
      new ThreadPoolExecutor(
          numThreads,
          numThreads,
          0,
          TimeUnit.MILLISECONDS,
          new ArrayBlockingQueue<Runnable>(10 * numThreads),
          new ThreadPoolExecutor.CallerRunsPolicy());
  listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
  partitionSize = Configuration.getInteger(TRAVERSE_PARTITION_SIZE, DEFAULT_PARTITION_SIZE).get();
  Configuration.checkConfiguration(
      partitionSize > 0,
      "Partition size can not be less than or equal to 0. Configured value %s",
      partitionSize);
  repositoryContext.getEventBus().register(this);
  repository.init(repositoryContext);
  logger.log(Level.INFO, "start full traversal connector executors");
}
 
源代码13 项目: joyqueue   文件: ProduceArchiveService.java
@Override
protected void validate() throws Exception {
    super.validate();
    archiveStore = archiveStore != null ? archiveStore : Plugins.ARCHIVESTORE.get();
    archiveStore.setNameSpace(archiveConfig.getNamespace());

    logger.info("Get archive store namespace [{}] by archive config.", archiveConfig.getNamespace());

    Preconditions.checkArgument(archiveStore != null, "archive store can not be null.");
    Preconditions.checkArgument(archiveConfig != null, "archive config can not be null.");

    this.tracer = Plugins.TRACERERVICE.get(archiveConfig.getTracerType());
    this.batchNum = archiveConfig.getProduceBatchNum();
    this.archiveQueue = new LinkedBlockingDeque<>(archiveConfig.getLogQueueSize());
    this.executorService = new ThreadPoolExecutor(archiveConfig.getWriteThreadNum(), archiveConfig.getWriteThreadNum(),
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(archiveConfig.getThreadPoolQueueSize()), new NamedThreadFactory("sendLog-archive"), new ThreadPoolExecutor.CallerRunsPolicy());
    this.updateItemThread = LoopThread.builder()
            .sleepTime(1000 * 10, 1000 * 10)
            .name("UpdateArchiveItem-Thread")
            .onException(e -> logger.warn("Exception:", e))
            .doWork(() -> {
                // 更新item列表
                updateArchiveItem();
                // 同步归档位置
                syncArchivePosition();
            }).build();

    this.readMsgThread = LoopThread.builder()
            .sleepTime(0, 10)
            .name("ReadArchiveMsg-Thread")
            .onException(e -> logger.warn("Exception:", e))
            .doWork(() -> {
                // 消费接口读取消息,放入队列
                readArchiveMsg();
            }).build();

    this.writeMsgThread = LoopThread.builder()
            .sleepTime(10, 10)
            .name("WriteArchiveMsg-Thread")
            .onException(e -> logger.warn("Exception:", e))
            .doWork(() -> {
                // 队列读取消息,放入归档存储
                write2Store();
            }).build();


}
 
源代码14 项目: dawnsci   文件: Slicer.java
public static void visitParallel(ISliceViewIterator iterator, final SliceVisitor visitor, int nProcessors) throws Exception {

		//Can't just farm out each slice to a separate thread, need to block when thread pool full,
		//other wise there is the potential run out of memory from loading all the data before any is processed
		
		//use one less thread than processors, as we are using one for the rejectedhandler
		nProcessors = nProcessors - 1;

		BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(nProcessors);
	    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
	    final ExecutorService pool =  new ThreadPoolExecutor(nProcessors, nProcessors, 
	        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
	    
		final SliceVisitor parallel = new SliceVisitor() {

			@Override
			public void visit(final IDataset slice) throws Exception {
				
				pool.execute(new Runnable() {
					
					@Override
					public void run() {
						try {
							
						    visitor.visit(slice);
						    
						    
						} catch (Throwable ne) {
							ne.printStackTrace();
							// TODO Fix me - should runtime exception really be thrown back to Fork/Join?
							//throw new RuntimeException(ne.getMessage(), ne);
						}
					}
				});
			}

			@Override
			public boolean isCancelled() {
				return visitor.isCancelled();
			}
		};
		
		Slicer.visit(iterator, parallel);
		
		pool.shutdown();
		
		while (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)){
			if (visitor.isCancelled()) break;
		}

	}
 
源代码15 项目: streams   文件: ExecutorUtils.java
public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
  return new ThreadPoolExecutor(numThreads, numThreads,
    5000L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
 
源代码16 项目: openjdk-jdk9   文件: StressAddMultiThreadedTest.java
public StressAddMultiThreadedTest() {
    queue = new ArrayBlockingQueue<>(THREADS);
    executor = new ThreadPoolExecutor(THREADS, THREADS, 100,
            TimeUnit.MILLISECONDS, queue,
            new ThreadPoolExecutor.CallerRunsPolicy());
}
 
源代码17 项目: streams   文件: GMailProvider.java
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
  return new ThreadPoolExecutor(nThreads, nThreads,
      5000L, TimeUnit.MILLISECONDS,
      new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
 
源代码18 项目: javan-warty-pig   文件: Util.java
public CurrentThreadExecutorService() {
  super(0, 1, 0, TimeUnit.SECONDS,
      new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());
}
 
源代码19 项目: bboxdb   文件: ExecutorUtil.java
/***
 * Returns an executor that holds up to maxQueueSize unprocessed tasks 
 * and processes maxThreads in parallel. When more than maxQueueSize tasks 
 * are submitted, the task is rejected from the queue and executed in the
 * calling thread directly
 * 
 * @param maxThreads
 * @param maxQueueSize
 * @return
 */
public static ThreadPoolExecutor getBoundThreadPoolExecutor(final int maxThreads, final int maxQueueSize) {
	// The queue holds up to maxQueueSize unexecuted tasks
	final BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(maxQueueSize);
	
	// The Pool executor executes up to maxThreads in parallel. If the queue is full, the caller
	// has to execute the task directly. 
	return new ThreadPoolExecutor(maxThreads / 2, maxThreads, 30, TimeUnit.SECONDS, 
			linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
}
 
/**
 * Returns a counting handler for rejected tasks that runs the rejected task directly in the
 * calling thread of the execute method, unless the executor has been shut down, in which case
 * the task is discarded.
 */
public static CountingRejectedExecutionHandler newCallerRunsPolicy() {
    return new CountingRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}