java.util.concurrent.BlockingQueue#remainingCapacity ( )源码实例Demo

下面列出了java.util.concurrent.BlockingQueue#remainingCapacity ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: openjdk-jdk9   文件: ArrayBlockingQueueTest.java
/**
 * remainingCapacity decreases on add, increases on remove
 */
public void testRemainingCapacity() {
    int size = ThreadLocalRandom.current().nextInt(1, SIZE);
    BlockingQueue q = populatedQueue(size, size, 2 * size, false);
    int spare = q.remainingCapacity();
    int capacity = spare + size;
    for (int i = 0; i < size; i++) {
        assertEquals(spare + i, q.remainingCapacity());
        assertEquals(capacity, q.size() + q.remainingCapacity());
        assertEquals(i, q.remove());
    }
    for (int i = 0; i < size; i++) {
        assertEquals(capacity - i, q.remainingCapacity());
        assertEquals(capacity, q.size() + q.remainingCapacity());
        assertTrue(q.add(i));
    }
}
 
源代码2 项目: incubator-gobblin   文件: CouchbaseWriterTest.java
private void drainQueue(BlockingQueue<Pair<AbstractDocument, Future>> queue, int threshold, long sleepTime,
    TimeUnit sleepUnit, List<Pair<AbstractDocument, Future>> failedFutures) {
  while (queue.remainingCapacity() < threshold) {
    if (sleepTime > 0) {
      Pair<AbstractDocument, Future> topElement = queue.peek();
      if (topElement != null) {
        try {
          topElement.getSecond().get(sleepTime, sleepUnit);
        } catch (Exception te) {
          failedFutures.add(topElement);
        }
        queue.poll();
      }
    }
  }
}
 
private BlockingQueue<SrsFlvFrame> resizeFlvTagCacheInternal(BlockingQueue<SrsFlvFrame> cache, int newSize) {
  if(newSize < cache.size() - cache.remainingCapacity()) {
    throw new RuntimeException("Can't fit current cache inside new cache size");
  }

  BlockingQueue<SrsFlvFrame> newQueue = new LinkedBlockingQueue<>(newSize);
  cache.drainTo(newQueue);
  return newQueue;
}
 
/**
 * Internal method used *ONLY* within tests.
 *
 * This is a hacky implementation. It could have race-conditions in other use-cases.
 *
 * @return set of all VirtualSpoutIds that ARE throttled
 */
Set<VirtualSpoutIdentifier> getThrottledVirtualSpoutIdentifiers() {
    Set<VirtualSpoutIdentifier> throttledVirtualSpoutIds = new HashSet<>();

    for (Map.Entry<VirtualSpoutIdentifier, BlockingQueue<Message>> entry : messageBuffer.entrySet()) {
        BlockingQueue<Message> queue = entry.getValue();
        if (queue.remainingCapacity() + queue.size() == getThrottledBufferSize()) {
            throttledVirtualSpoutIds.add(entry.getKey());
        }
    }
    return throttledVirtualSpoutIds;
}
 
/**
 * Internal method used *ONLY* within tests.
 *
 * This is a hacky implementation. It could have race-conditions in other use-cases.
 *
 * @return set of all VirtualSpoutIds that are NOT throttled.
 */
Set<VirtualSpoutIdentifier> getNonThrottledVirtualSpoutIdentifiers() {
    Set<VirtualSpoutIdentifier> nonThrottledVirtualSpoutIds = new HashSet<>();

    for (Map.Entry<VirtualSpoutIdentifier, BlockingQueue<Message>> entry : messageBuffer.entrySet()) {
        BlockingQueue<Message> queue = entry.getValue();
        if (queue.remainingCapacity() + queue.size() > getThrottledBufferSize()) {
            nonThrottledVirtualSpoutIds.add(entry.getKey());
        }
    }
    return nonThrottledVirtualSpoutIds;
}
 
源代码6 项目: hadoop   文件: FairCallQueue.java
/**
 * Returns maximum remaining capacity. This does not reflect how much you can
 * ideally fit in this FairCallQueue, as that would depend on the scheduler's
 * decisions.
 */
@Override
public int remainingCapacity() {
  int sum = 0;
  for (BlockingQueue q : this.queues) {
    sum += q.remainingCapacity();
  }
  return sum;
}
 
源代码7 项目: big-c   文件: FairCallQueue.java
/**
 * Returns maximum remaining capacity. This does not reflect how much you can
 * ideally fit in this FairCallQueue, as that would depend on the scheduler's
 * decisions.
 */
@Override
public int remainingCapacity() {
  int sum = 0;
  for (BlockingQueue q : this.queues) {
    sum += q.remainingCapacity();
  }
  return sum;
}
 
源代码8 项目: framework   文件: MessageThreadPoolExecutor.java
/**
 * Description: 通过线程池来处理消息<br>
 * 
 * @author 王伟<br>
 * @taskId <br>
 * @param channel
 * @param message
 * @throws InterruptedException <br>
 */
public static void execute(final String channel, final Runnable message) {
    synchronized (channel) {
        ThreadPoolExecutor executor = executorMap.get(channel);
        if (executor == null) {
            executor = createThreadPoolExecutor();
            executorMap.put(channel, executor);
        }
        BlockingQueue<Runnable> bq = executor.getQueue();

        // 当线程池中的队列出现阻塞后,暂停从redis中进行获取
        try {
            long count = 0;
            while (bq.remainingCapacity() == 0 && executor.getMaximumPoolSize() == executor.getPoolSize()) {
                if (count++ % NUM_100 == 0) {
                    LoggerUtil.debug("wait message[{0}] execute, current pool size is [{1}]", channel, bq.size());
                }
                Thread.sleep(NUM_100);
            }
            executor.execute(message);
        }
        catch (InterruptedException e) {
            LoggerUtil.error(e);
        }

    }
}
 
@Override
public PacketScheduler.Producer<T> attach(BlockingQueue<T> queue, RateLimiter rateLimiter) {
   int capacity = queue.remainingCapacity();
   
   int lowWaterMark, highWaterMark;
   if (Integer.MAX_VALUE == capacity || 0 == capacity || (lowWatermarkPercent <= 0 && highWatermarkPercent <= 0)) {
      lowWaterMark = -1;
      highWaterMark = -1;
   } else {
      double lwm = capacity * lowWatermarkPercent;
      double hwm = capacity * highWatermarkPercent;

      lowWaterMark = (int)Math.floor(lwm);
      highWaterMark = (int)Math.ceil(hwm);
   }

   AbstractProducer<T> producer;
   if (blocking) {
      producer = new BlockingProducer<T>(
         available,
         queue,
         rateLimiter,
         dropHandler.get(),
         queueHandler.get(),
         lowWaterMark,
         highWaterMark
      );
   } else {
      producer = new DroppingProducer<T>(
         available,
         queue,
         rateLimiter,
         dropHandler.get(),
         queueHandler.get(),
         lowWaterMark,
         highWaterMark
      );
   }

   synchronized (producers) {
      producers.add(producer);
      numProducers = producers.size();
   }

   return producer;
}
 
源代码10 项目: porcupine   文件: InstrumentedThreadPoolExecutor.java
public InstrumentedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    this.minRemainingQueueCapacity = workQueue.remainingCapacity();
}
 
源代码11 项目: streamsupport   文件: JSR166TestCase.java
/**
 * Returns maximum number of tasks that can be submitted to given
 * pool (with bounded queue) before saturation (when submission
 * throws RejectedExecutionException).
 */
static final int saturatedSize(ThreadPoolExecutor pool) {
    BlockingQueue<Runnable> q = pool.getQueue();
    return pool.getMaximumPoolSize() + q.size() + q.remainingCapacity();
}