下面列出了java.util.concurrent.PriorityBlockingQueue#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Scan the priority queue of queries with a specified deadline, halting any
* queries whose deadline has expired.
*/
static private void checkDeadlines(final long nowNanos,
final PriorityBlockingQueue<QueryDeadline> deadlineQueue) {
/*
* While the queue is thread safe, we want at most one thread at a time
* to be inspecting the queue for queries whose deadlines have expired.
*/
synchronized (deadlineQueue) {
/*
* Check the head of the deadline queue for any queries whose
* deadline has expired.
*/
checkHeadOfDeadlineQueue(nowNanos, deadlineQueue);
if (deadlineQueue.size() > DEADLINE_QUEUE_SCAN_SIZE) {
/*
* Scan the deadline queue, removing entries for expired
* queries.
*/
scanDeadlineQueue(nowNanos, deadlineQueue);
}
}
}
@Override
public void enqueueLog(Path log) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
// the shipper may quit immediately
queue.put(log);
queues.put(logPrefix, queue);
if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(logPrefix, queue);
}
} else {
queue.put(log);
}
if (LOG.isTraceEnabled()) {
LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
this.replicationQueueInfo.getQueueId());
}
this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the warn threshold
int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) {
LOG.warn("{} WAL group {} queue size: {} exceeds value of "
+ "replication.source.log.queue.warn: {}", logPeerId(),
logPrefix, queueSize, logQueueWarnThreshold);
}
}
@Override
public int pendingQueueSize(String gatewayId)
{
PriorityBlockingQueue<OutboundMessage> queue = queueMap.get(gatewayId);
if (queue == null) return 0;
return queue.size();
}