下面列出了java.util.concurrent.BlockingQueue#remainingCapacity ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* remainingCapacity decreases on add, increases on remove
*/
public void testRemainingCapacity() {
int size = ThreadLocalRandom.current().nextInt(1, SIZE);
BlockingQueue q = populatedQueue(size, size, 2 * size, false);
int spare = q.remainingCapacity();
int capacity = spare + size;
for (int i = 0; i < size; i++) {
assertEquals(spare + i, q.remainingCapacity());
assertEquals(capacity, q.size() + q.remainingCapacity());
assertEquals(i, q.remove());
}
for (int i = 0; i < size; i++) {
assertEquals(capacity - i, q.remainingCapacity());
assertEquals(capacity, q.size() + q.remainingCapacity());
assertTrue(q.add(i));
}
}
private void drainQueue(BlockingQueue<Pair<AbstractDocument, Future>> queue, int threshold, long sleepTime,
TimeUnit sleepUnit, List<Pair<AbstractDocument, Future>> failedFutures) {
while (queue.remainingCapacity() < threshold) {
if (sleepTime > 0) {
Pair<AbstractDocument, Future> topElement = queue.peek();
if (topElement != null) {
try {
topElement.getSecond().get(sleepTime, sleepUnit);
} catch (Exception te) {
failedFutures.add(topElement);
}
queue.poll();
}
}
}
}
private BlockingQueue<SrsFlvFrame> resizeFlvTagCacheInternal(BlockingQueue<SrsFlvFrame> cache, int newSize) {
if(newSize < cache.size() - cache.remainingCapacity()) {
throw new RuntimeException("Can't fit current cache inside new cache size");
}
BlockingQueue<SrsFlvFrame> newQueue = new LinkedBlockingQueue<>(newSize);
cache.drainTo(newQueue);
return newQueue;
}
/**
* Internal method used *ONLY* within tests.
*
* This is a hacky implementation. It could have race-conditions in other use-cases.
*
* @return set of all VirtualSpoutIds that ARE throttled
*/
Set<VirtualSpoutIdentifier> getThrottledVirtualSpoutIdentifiers() {
Set<VirtualSpoutIdentifier> throttledVirtualSpoutIds = new HashSet<>();
for (Map.Entry<VirtualSpoutIdentifier, BlockingQueue<Message>> entry : messageBuffer.entrySet()) {
BlockingQueue<Message> queue = entry.getValue();
if (queue.remainingCapacity() + queue.size() == getThrottledBufferSize()) {
throttledVirtualSpoutIds.add(entry.getKey());
}
}
return throttledVirtualSpoutIds;
}
/**
* Internal method used *ONLY* within tests.
*
* This is a hacky implementation. It could have race-conditions in other use-cases.
*
* @return set of all VirtualSpoutIds that are NOT throttled.
*/
Set<VirtualSpoutIdentifier> getNonThrottledVirtualSpoutIdentifiers() {
Set<VirtualSpoutIdentifier> nonThrottledVirtualSpoutIds = new HashSet<>();
for (Map.Entry<VirtualSpoutIdentifier, BlockingQueue<Message>> entry : messageBuffer.entrySet()) {
BlockingQueue<Message> queue = entry.getValue();
if (queue.remainingCapacity() + queue.size() > getThrottledBufferSize()) {
nonThrottledVirtualSpoutIds.add(entry.getKey());
}
}
return nonThrottledVirtualSpoutIds;
}
/**
* Returns maximum remaining capacity. This does not reflect how much you can
* ideally fit in this FairCallQueue, as that would depend on the scheduler's
* decisions.
*/
@Override
public int remainingCapacity() {
int sum = 0;
for (BlockingQueue q : this.queues) {
sum += q.remainingCapacity();
}
return sum;
}
/**
* Returns maximum remaining capacity. This does not reflect how much you can
* ideally fit in this FairCallQueue, as that would depend on the scheduler's
* decisions.
*/
@Override
public int remainingCapacity() {
int sum = 0;
for (BlockingQueue q : this.queues) {
sum += q.remainingCapacity();
}
return sum;
}
/**
* Description: 通过线程池来处理消息<br>
*
* @author 王伟<br>
* @taskId <br>
* @param channel
* @param message
* @throws InterruptedException <br>
*/
public static void execute(final String channel, final Runnable message) {
synchronized (channel) {
ThreadPoolExecutor executor = executorMap.get(channel);
if (executor == null) {
executor = createThreadPoolExecutor();
executorMap.put(channel, executor);
}
BlockingQueue<Runnable> bq = executor.getQueue();
// 当线程池中的队列出现阻塞后,暂停从redis中进行获取
try {
long count = 0;
while (bq.remainingCapacity() == 0 && executor.getMaximumPoolSize() == executor.getPoolSize()) {
if (count++ % NUM_100 == 0) {
LoggerUtil.debug("wait message[{0}] execute, current pool size is [{1}]", channel, bq.size());
}
Thread.sleep(NUM_100);
}
executor.execute(message);
}
catch (InterruptedException e) {
LoggerUtil.error(e);
}
}
}
@Override
public PacketScheduler.Producer<T> attach(BlockingQueue<T> queue, RateLimiter rateLimiter) {
int capacity = queue.remainingCapacity();
int lowWaterMark, highWaterMark;
if (Integer.MAX_VALUE == capacity || 0 == capacity || (lowWatermarkPercent <= 0 && highWatermarkPercent <= 0)) {
lowWaterMark = -1;
highWaterMark = -1;
} else {
double lwm = capacity * lowWatermarkPercent;
double hwm = capacity * highWatermarkPercent;
lowWaterMark = (int)Math.floor(lwm);
highWaterMark = (int)Math.ceil(hwm);
}
AbstractProducer<T> producer;
if (blocking) {
producer = new BlockingProducer<T>(
available,
queue,
rateLimiter,
dropHandler.get(),
queueHandler.get(),
lowWaterMark,
highWaterMark
);
} else {
producer = new DroppingProducer<T>(
available,
queue,
rateLimiter,
dropHandler.get(),
queueHandler.get(),
lowWaterMark,
highWaterMark
);
}
synchronized (producers) {
producers.add(producer);
numProducers = producers.size();
}
return producer;
}
public InstrumentedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.minRemainingQueueCapacity = workQueue.remainingCapacity();
}
/**
* Returns maximum number of tasks that can be submitted to given
* pool (with bounded queue) before saturation (when submission
* throws RejectedExecutionException).
*/
static final int saturatedSize(ThreadPoolExecutor pool) {
BlockingQueue<Runnable> q = pool.getQueue();
return pool.getMaximumPoolSize() + q.size() + q.remainingCapacity();
}