java.util.AbstractSet#java.util.concurrent.LinkedBlockingQueue源码实例Demo

下面列出了java.util.AbstractSet#java.util.concurrent.LinkedBlockingQueue 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: vjtools   文件: ThreadPoolBuilder.java
public ThreadPoolExecutor build() {
	BlockingQueue<Runnable> queue = null;
	if (queueSize < 1) {
		queue = new LinkedBlockingQueue<Runnable>();
	} else {
		queue = new ArrayBlockingQueue<Runnable>(queueSize);
	}

	threadFactory = createThreadFactory(threadFactory, threadNamePrefix, daemon);

	if (rejectHandler == null) {
		rejectHandler = defaultRejectHandler;
	}

	return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory,
			rejectHandler);
}
 
源代码2 项目: streams   文件: AbstractGPlusProvider.java
@Override
public StreamsResultSet readCurrent() {
  BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
  int batchCount = 0;
  while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
    StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
    if (datum != null) {
      ++batchCount;
      ComponentUtils.offerUntilSuccess(datum, batch);
    }
  }
  boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() && this.executor.isTerminated();
  this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
  this.previousPullWasEmpty = pullIsEmpty;
  return new StreamsResultSet(batch);
}
 
源代码3 项目: swift-explorer   文件: AuthHttpServer.java
@Override
public Map<String, String> startAndWaitForData () throws IOException, InterruptedException
{
	InetSocketAddress addr = new InetSocketAddress(port);	
	BlockingQueue<Map<String, String> > blockingQueue = new LinkedBlockingQueue<Map<String, String> >();
	synchronized (lock)
	{
		sharedQueue = blockingQueue ;
		server = HttpServer.create(addr, 0);
		server.createContext("/", new HandlerMapParameter(blockingQueue));
		httpThreadPool = Executors.newCachedThreadPool() ;
		server.setExecutor(httpThreadPool);
		server.start();
	}
	return blockingQueue.poll(10 * 60, TimeUnit.SECONDS);
}
 
源代码4 项目: gemfirexd-oss   文件: GatewayImpl.java
/**
 * Initialize the Executor that handles listener events. Only used by
 * non-primary gateways
 */
private void initializeListenerExecutor()
{
  // Create the ThreadGroups
  final ThreadGroup loggerGroup = LogWriterImpl.createThreadGroup(
      "Gateway Listener Group", getLogger());

  // Create the Executor
  ThreadFactory tf = new ThreadFactory() {
      public Thread newThread(Runnable command) {
        Thread thread =  new Thread(loggerGroup, command, "Queued Gateway Listener Thread");
        thread.setDaemon(true);
        return thread;
      }
    };
  LinkedBlockingQueue q = new LinkedBlockingQueue();
  this._executor = new ThreadPoolExecutor(1, 1/*max unused*/,
                                          120, TimeUnit.SECONDS, q, tf);
}
 
@Before
public void setup() {
    eventFactory = new TestEventHolderFactory();
    channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();

    byteBuffers = new LinkedBlockingQueue<>();
    byteBuffers.add(ByteBuffer.allocate(4096));

    events = new LinkedBlockingQueue<>();
    logger = Mockito.mock(ComponentLog.class);

    maxConnections = 1;
    sslContext = null;
    charset = StandardCharsets.UTF_8;

    dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
            maxConnections, sslContext, charset);

}
 
源代码6 项目: game-server   文件: HandlerExecutor.java
/**
 * 
 * @param corePoolSize 最小线程数,包括空闲线程
 * @param maxPoolSize 最大线程数 
 * @param keepAliveTime 当线程数大于核心时,终止多余的空闲线程等待新任务的最长时间
 * @param cacheSize 执行队列大小
 * @param prefix 线程池前缀名称
 */
public HandlerExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, int cacheSize, String prefix) {
	TimeUnit unit = TimeUnit.MINUTES;
	/**
	 * 任务队列
	 */
	LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
	/**
	 * 队例满到无法接受新任务时。直接抛弃
	 */
	RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
	
	if (prefix == null) {
		prefix = "";
	}
	ThreadFactory threadFactory = new HandlerThreadFactory(prefix);
	pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
 
@Test(timeout = 60000)
public void testWatchIndefinitely() throws Exception {
    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
    ownerships.add(new ServerLoad("server1"));
    final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
        new LinkedBlockingQueue<TreeSet<ServerLoad>>();
    PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() {
        @Override
        public void callback(TreeSet<ServerLoad> serverLoads) {
            serverLoadNotifications.add(serverLoads);
        }
    };
    zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
    zkPlacementStateManager.watch(callback);
    // cannot verify the callback here as it may call before the verify is called

    zkPlacementStateManager.saveOwnership(ownerships);
    assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));

    ServerLoad server2 = new ServerLoad("server2");
    server2.addStream(new StreamLoad("hella-important-stream", 415));
    ownerships.add(server2);
    zkPlacementStateManager.saveOwnership(ownerships);
    assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
}
 
源代码8 项目: ditto   文件: HttpPushFactoryTest.java
private void newBinding() {
    requestQueue = new LinkedBlockingQueue<>();
    responseQueue = new LinkedBlockingQueue<>();

    final Flow<HttpRequest, HttpResponse, NotUsed> handler =
            Flow.fromGraph(KillSwitches.<HttpRequest>single())
                    .mapAsync(1, request -> {
                        requestQueue.offer(request);
                        return responseQueue.take();
                    })
                    .mapMaterializedValue(killSwitch -> {
                        Objects.requireNonNull(killSwitchTrigger.peek())
                                .thenAccept(_void -> killSwitch.shutdown());
                        return NotUsed.getInstance();
                    });
    binding = Http.get(actorSystem).bindAndHandle(handler, ConnectHttp.toHost("127.0.0.1", 0), mat)
            .toCompletableFuture()
            .join();
}
 
/**
 * Create a PeriodicNotificationApplication.
 * @param conf - Configuration object that specifies the parameters needed to create the application
 * @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results
 * @throws PeriodicApplicationException
 */
public static PeriodicNotificationApplication getPeriodicApplication(final PeriodicNotificationApplicationConfiguration conf) throws PeriodicApplicationException {
    final Properties kafkaConsumerProps = getKafkaConsumerProperties(conf);
    final Properties kafkaProducerProps = getKafkaProducerProperties(conf);

    final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
    final BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
    final BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();

    FluoClient fluo = null;
    try {
        final PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
        fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf);
        final NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
        addRegisteredNotices(coordinator, fluo.newSnapshot());
        final KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProducerProps, bindingSets);
        final PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
        final NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
        final KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaConsumerProps);
        return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter)
                .setProcessor(processor).setPruner(pruner).build();
    } catch (AccumuloException | AccumuloSecurityException e) {
        throw new PeriodicApplicationException(e.getMessage());
    }
}
 
@Override
public synchronized void start(StartContext context) throws StartException {
    final String namePattern = "External Management Request Threads -- %t";
    final ThreadFactory threadFactory = doPrivileged(new PrivilegedAction<ThreadFactory>() {
        public ThreadFactory run() {
            return new JBossThreadFactory(threadGroup, Boolean.FALSE, null, namePattern, null, null);
        }
    });

    int poolSize = getPoolSize();
    if (EnhancedQueueExecutor.DISABLE_HINT) {
        final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(WORK_QUEUE_SIZE);
        executorService = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS,
                workQueue, threadFactory);
    } else {
        executorService = new EnhancedQueueExecutor.Builder()
            .setCorePoolSize(poolSize)
            .setMaximumPoolSize(poolSize)
            .setKeepAliveTime(60L, TimeUnit.SECONDS)
            .setMaximumQueueSize(WORK_QUEUE_SIZE)
            .setThreadFactory(threadFactory)
            .build();
    }
}
 
源代码11 项目: soul   文件: HttpSyncDataService.java
private void start(final HttpConfig httpConfig) {
    // init RestTemplate
    OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
    factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
    factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
    this.httpClient = new RestTemplate(factory);
    // It could be initialized multiple times, so you need to control that.
    if (RUNNING.compareAndSet(false, true)) {
        // fetch all group configs.
        this.fetchGroupConfig(ConfigGroupEnum.values());
        // one thread for listener, another one for fetch configuration data.
        this.executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                SoulThreadFactory.create("http-long-polling", true));
        // start long polling.
        this.executor.execute(new HttpLongPollingTask());
    } else {
        log.info("soul http long polling was started, executor=[{}]", executor);
    }
}
 
源代码12 项目: actioncable-client-java   文件: SubscriptionTest.java
@Test
public void onConnectedByCustomInterface() throws URISyntaxException, InterruptedException {
    final BlockingQueue<String> events = new LinkedBlockingQueue<String>();

    final Consumer consumer = new Consumer(new URI("ws://example.com:28080"));
    final Channel channel = new Channel("CommentsChannel");
    final Subscription subscription = consumer.getSubscriptions().create(channel, CustomSubscription.class);

    final Subscription returned = subscription.onConnected(new Subscription.ConnectedCallback() {
        @Override
        public void call() {
            events.offer("onConnected");
        }
    });
    assertThat(returned, is(theInstance(subscription)));

    consumer.getSubscriptions().notifyConnected(subscription.getIdentifier());

    assertThat(events.take(), is("onConnected"));
}
 
源代码13 项目: flink   文件: String2HashJoinOperatorTest.java
public static LinkedBlockingQueue<Object> transformToBinary(LinkedBlockingQueue<Object> output) {
	LinkedBlockingQueue<Object> ret = new LinkedBlockingQueue<>();
	for (Object o : output) {
		BaseRow row = ((StreamRecord<BaseRow>) o).getValue();
		BinaryRow binaryRow;
		if (row.isNullAt(0)) {
			binaryRow = newRow(row.getString(2).toString(), row.getString(3) + "null");
		} else if (row.isNullAt(2)) {
			binaryRow = newRow(row.getString(0).toString(), row.getString(1) + "null");
		} else {
			String value1 = row.getString(1).toString();
			String value2 = row.getString(3).toString();
			binaryRow = newRow(row.getString(0).toString(), value1 + value2);
		}
		ret.add(new StreamRecord(binaryRow));
	}
	return ret;
}
 
源代码14 项目: beast   文件: BqQueueWorkerTest.java
@Test
public void shouldReadFromQueueForeverAndPushToSink() throws InterruptedException {
    BlockingQueue<Records> queue = new LinkedBlockingQueue<>();
    BqQueueWorker worker = new BqQueueWorker("bq-worker", successfulSink, queueConfig, committer, queue, workerState);
    Records messages2 = mock(Records.class);
    when(committer.acknowledge(any())).thenReturn(true);
    queue.put(messages);
    queue.put(messages2);

    Thread workerThread = new Thread(worker);
    workerThread.start();

    await().atMost(10, TimeUnit.SECONDS).until(() -> queue.isEmpty());
    workerState.closeWorker();
    workerThread.join();
    verify(successfulSink).push(messages);
    verify(successfulSink).push(messages2);
}
 
源代码15 项目: protect   文件: ServerCommunicationSystem.java
/**
 * Creates a new instance of ServerCommunicationSystem
 */
public ServerCommunicationSystem(ServerViewController controller, ServiceReplica replica) throws Exception {
	super("Server CS");

	this.controller = controller;

	inQueue = new LinkedBlockingQueue<SystemMessage>(controller.getStaticConf().getInQueueSize());

	// create a new conf, with updated port number for servers
	// TOMConfiguration serversConf = new TOMConfiguration(conf.getProcessId(),
	// Configuration.getHomeDir(), "hosts.config");

	// serversConf.increasePortNumber();

	serversConn = new ServersCommunicationLayer(controller, inQueue, replica);

	messageHandler = new MessageHandler(controller.getStaticConf().getHmacAlgorithm());

	// ******* EDUARDO BEGIN **************//
	// if (manager.isInCurrentView() || manager.isInInitView()) {
	clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);
	// }
	// ******* EDUARDO END **************//
	// start();
}
 
源代码16 项目: nifi   文件: TestBeatsSocketChannelHandler.java
@Before
public void setup() {
    eventFactory = new TestEventHolderFactory();
    channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();

    byteBuffers = new LinkedBlockingQueue<>();
    byteBuffers.add(ByteBuffer.allocate(4096));

    events = new LinkedBlockingQueue<>();
    logger = Mockito.mock(ComponentLog.class);

    maxConnections = 1;
    sslContext = null;
    charset = StandardCharsets.UTF_8;

    dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
            maxConnections, sslContext, charset);

}
 
源代码17 项目: dubbo3   文件: LimitedThreadPool.java
public Executor getExecutor(URL url) {
    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
    return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
    		queues == 0 ? new SynchronousQueue<>() :
    			(queues < 0 ? new LinkedBlockingQueue<>()
    					: new LinkedBlockingQueue<>(queues)),
    		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
 
源代码18 项目: radar   文件: ServiceTest.java
public static void start() {
	if (!flag) {
		synchronized (lockObj) {
			if (!flag) {
				flag = true;
				executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
						new LinkedBlockingQueue<Runnable>(), SoaThreadFactory.create("HeartBeatSevice", true));
				doHt();
			}
		}
	}
}
 
源代码19 项目: Neptune   文件: PActivityStackSupervisor.java
/**
 * 获取对应插件缓存还未执行加载的Intent
 */
public static LinkedBlockingQueue<IntentRequest> getCachedIntent(String pkgName) {
    if (TextUtils.isEmpty(pkgName)) {
        return null;
    }
    return sIntentCacheMap.get(pkgName);
}
 
源代码20 项目: alipay-sdk-java-all   文件: WebSocketImpl.java
/**
 * creates a websocket with client role
 *
 * @param listener The listener for this instance
 * @param draft    The draft which should be used
 */
public WebSocketImpl(WebSocketListener listener, Draft draft) {
    if (listener == null || (draft == null && role
            == Role.SERVER))// socket can be null because we want do be able to create the object without already having a bound channel
    { throw new IllegalArgumentException("parameters must not be null"); }
    this.outQueue = new LinkedBlockingQueue<ByteBuffer>();
    inQueue = new LinkedBlockingQueue<ByteBuffer>();
    this.wsl = listener;
    this.role = Role.CLIENT;
    if (draft != null) { this.draft = draft.copyInstance(); }
}
 
源代码21 项目: tunnel   文件: EsPublisherTest.java
@Test
public void test_BlockingQueue() {
    LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(16);
    for (int i = 0; i < 16; i++) {
        queue.add("idx-" + i);
    }
    List<String> list = new ArrayList<>();
    queue.drainTo(list, 8);
    Assert.assertEquals(queue.size(), list.size());

}
 
源代码22 项目: AssistantBySDK   文件: MobileCommProcessor.java
public MobileCommProcessor(Context mContext, SystemVoiceMediator mediator, Handler handler) {
    super(mContext, mediator);
    this.mHandler = handler;
    mAppConfig = (AppConfig) ((Service) mContext).getApplication();
    tPools = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    phoneCallListener();
    // 注册收件箱内容观察者
    mContext.getContentResolver().registerContentObserver(Uri.parse(PhoneContactUtils.SMS_URI_INBOX),
            true, new SmsObserver(handler));
    // 注册联系人内容观察者
    mContext.getContentResolver().registerContentObserver(ContactsContract.Contacts.CONTENT_URI,
            true, new ContactObserver(handler));
}
 
源代码23 项目: kylin-on-parquet-v2   文件: HBaseConnection.java
public static ExecutorService getCoprocessorPool() {
    if (coprocessorPool != null) {
        return coprocessorPool;
    }

    synchronized (HBaseConnection.class) {
        if (coprocessorPool != null) {
            return coprocessorPool;
        }

        KylinConfig config = KylinConfig.getInstanceFromEnv();

        // copy from HConnectionImplementation.getBatchPool()
        int maxThreads = config.getHBaseMaxConnectionThreads();
        int coreThreads = config.getHBaseCoreConnectionThreads();
        long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, //
                Threads.newDaemonThreadFactory("kylin-coproc-"));
        tpe.allowCoreThreadTimeOut(true);

        logger.info("Creating coprocessor thread pool with max of {}, core of {}", maxThreads, coreThreads);

        coprocessorPool = tpe;
        return coprocessorPool;
    }
}
 
@Override
public boolean start() {

  if (running.getAndSet(true)) {
    return false;
  }
  else {
    queue = new LinkedBlockingQueue<Runnable>();
    executorService.submit(queueRunnable);
    return true;
  }
}
 
源代码25 项目: tez   文件: DeletionTrackerImpl.java
public DeletionTrackerImpl(Configuration conf) {
  super(conf);
  this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT,
      TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleDeleteTracker #%d").build());
}
 
源代码26 项目: oxTrust   文件: MetadataValidationTimer.java
@PostConstruct
public void init() {
	this.isActive = new AtomicBoolean(true);
	try {
		this.metadataUpdates = new LinkedBlockingQueue<String>();
	} finally {
		this.isActive.set(false);
	}
}
 
@Test(timeout = TIMEOUT)
public void removeWhenIdentifierIsNotUnique() throws IOException, InterruptedException {
    final BlockingQueue<String> events = new LinkedBlockingQueue<String>();

    final MockWebServer mockWebServer = new MockWebServer();
    final MockResponse response = new MockResponse();
    response.withWebSocketUpgrade(new DefaultWebSocketListener() {
        @Override
        public void onMessage(BufferedSource payload, WebSocket.PayloadType type) throws IOException {
            events.offer("onMessage:" + payload.readUtf8());
            payload.close();
        }
    });
    mockWebServer.enqueue(response);

    final Consumer consumer = new Consumer(mockWebServer.url("/").uri());
    final Subscriptions subscriptions = consumer.getSubscriptions();

    final Subscription subscription1 = subscriptions.create(new Channel("CommentsChannel"));
    // Channel is same as subscription1
    final Subscription subscription2 = subscriptions.create(new Channel("CommentsChannel"));

    consumer.connect();

    events.take(); // WebSocketListener#onMessage (subscribe)
    events.take(); // WebSocketListener#onMessage (subscribe)

    subscriptions.remove(subscription1);

    assertThat(subscriptions.contains(subscription1), is(false));
    assertThat(subscriptions.contains(subscription2), is(true));

    assertThat(events.take(), is("onMessage:" + Command.unsubscribe(subscription1.getIdentifier()).toJson()));

    mockWebServer.shutdown();
}
 
源代码28 项目: gemfirexd-oss   文件: RemoteQueryListener.java
public RemoteQueryListener() {
  final ThreadGroup queryThreadGroup = new ThreadGroup(
      "Query Executor Thread Group");
  ThreadFactory queryThreadFactory = new ThreadFactory() {
    public Thread newThread(Runnable command)
    {
      return new Thread(queryThreadGroup, command, queryThreadGroup.getName()
          + " Query Executor Thread ");
    }
  };
  queryExecutorPool = new ThreadPoolExecutor(MAX_THREADS, MAX_THREADS, 15,
      TimeUnit.SECONDS, new LinkedBlockingQueue(), queryThreadFactory);

}
 
源代码29 项目: dubbox   文件: LimitedThreadPool.java
public Executor getExecutor(URL url) {
    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
    return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
    		queues == 0 ? new SynchronousQueue<Runnable>() : 
    			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
    					: new LinkedBlockingQueue<Runnable>(queues)),
    		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
 
源代码30 项目: lucene-solr   文件: ConcurrentQueryLoader.java
/**
 * Create a new ConcurrentQueryLoader
 *
 * @param monitor   the Monitor to load queries to
 * @param threads   the number of threads to use
 * @param queueSize the size of the buffer to hold queries in
 */
public ConcurrentQueryLoader(Monitor monitor, int threads, int queueSize) {
  this.monitor = monitor;
  this.queue = new LinkedBlockingQueue<>(queueSize);
  this.executor = Executors.newFixedThreadPool(threads, new NamedThreadFactory("loader"));
  this.shutdownLatch = new CountDownLatch(threads);
  for (int i = 0; i < threads; i++) {
    this.executor.submit(new Worker(queueSize / threads));
  }
}