下面列出了java.util.concurrent.PriorityBlockingQueue#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* iterator.remove removes current element
*/
public void testIteratorRemove() {
final PriorityBlockingQueue q = new PriorityBlockingQueue(3);
q.add(new Integer(2));
q.add(new Integer(1));
q.add(new Integer(3));
Iterator it = q.iterator();
it.next();
it.remove();
it = q.iterator();
assertEquals(it.next(), new Integer(2));
assertEquals(it.next(), new Integer(3));
assertFalse(it.hasNext());
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
PriorityBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* iterator.remove removes current element
*/
public void testIteratorRemove() {
final PriorityBlockingQueue q = new PriorityBlockingQueue(3);
q.add(new Integer(2));
q.add(new Integer(1));
q.add(new Integer(3));
Iterator it = q.iterator();
it.next();
it.remove();
it = q.iterator();
assertEquals(it.next(), new Integer(2));
assertEquals(it.next(), new Integer(3));
assertFalse(it.hasNext());
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
PriorityBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* isEmpty is true before add, false after
*/
public void testEmpty() {
PriorityBlockingQueue q = new PriorityBlockingQueue(2);
assertTrue(q.isEmpty());
assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
q.add(one);
assertFalse(q.isEmpty());
q.add(two);
q.remove();
q.remove();
assertTrue(q.isEmpty());
}
/**
* clear removes all elements
*/
public void testClear() {
PriorityBlockingQueue q = populatedQueue(SIZE);
q.clear();
assertTrue(q.isEmpty());
assertEquals(0, q.size());
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(one));
q.clear();
assertTrue(q.isEmpty());
}
/**
* containsAll(c) is true when c contains a subset of elements
*/
public void testContainsAll() {
PriorityBlockingQueue q = populatedQueue(SIZE);
PriorityBlockingQueue p = new PriorityBlockingQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertTrue(q.containsAll(p));
assertFalse(p.containsAll(q));
p.add(new Integer(i));
}
assertTrue(p.containsAll(q));
}
/**
* isEmpty is true before add, false after
*/
public void testEmpty() {
PriorityBlockingQueue q = new PriorityBlockingQueue(2);
assertTrue(q.isEmpty());
assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
q.add(one);
assertFalse(q.isEmpty());
q.add(two);
q.remove();
q.remove();
assertTrue(q.isEmpty());
}
/**
* clear removes all elements
*/
public void testClear() {
PriorityBlockingQueue q = populatedQueue(SIZE);
q.clear();
assertTrue(q.isEmpty());
assertEquals(0, q.size());
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(one));
q.clear();
assertTrue(q.isEmpty());
}
/**
* containsAll(c) is true when c contains a subset of elements
*/
public void testContainsAll() {
PriorityBlockingQueue q = populatedQueue(SIZE);
PriorityBlockingQueue p = new PriorityBlockingQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertTrue(q.containsAll(p));
assertFalse(p.containsAll(q));
p.add(new Integer(i));
}
assertTrue(p.containsAll(q));
}
@Test
public void givenUnorderedValues_whenPolling_thenShouldOrderQueue() throws InterruptedException {
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ArrayList<Integer> polledElements = new ArrayList<>();
queue.add(1);
queue.add(5);
queue.add(2);
queue.add(3);
queue.add(4);
queue.drainTo(polledElements);
assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);
}
private boolean addToGatewayQueue(OutboundMessage message, boolean store)
{
PriorityBlockingQueue<OutboundMessage> queue = queueMap.get(message.getGatewayId());
if (queue == null)
{
queue = new PriorityBlockingQueue<OutboundMessage>(50, new PriorityComparator());
queueMap.put(message.getGatewayId(), queue);
}
boolean queued = queue.add(message);
if (store && queued) storePendingMessage(message);
return queued;
}
@Test
public void testDelayedCheck() throws InterruptedException {
PriorityBlockingQueue<Anomaly> anomalies = new PriorityBlockingQueue<>(ANOMALY_DETECTOR_INITIAL_QUEUE_SIZE,
anomalyComparator());
AdminClient mockAdminClient = EasyMock.createNiceMock(AdminClient.class);
AnomalyNotifier mockAnomalyNotifier = EasyMock.mock(AnomalyNotifier.class);
BrokerFailureDetector mockBrokerFailureDetector = EasyMock.createNiceMock(BrokerFailureDetector.class);
GoalViolationDetector mockGoalViolationDetector = EasyMock.createNiceMock(GoalViolationDetector.class);
MetricAnomalyDetector mockMetricAnomalyDetector = EasyMock.createNiceMock(MetricAnomalyDetector.class);
TopicAnomalyDetector mockTopicAnomalyDetector = EasyMock.createNiceMock(TopicAnomalyDetector.class);
DiskFailureDetector mockDiskFailureDetector = EasyMock.createNiceMock(DiskFailureDetector.class);
ScheduledExecutorService mockDetectorScheduler = EasyMock.mock(ScheduledExecutorService.class);
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
KafkaCruiseControl mockKafkaCruiseControl = EasyMock.mock(KafkaCruiseControl.class);
EasyMock.expect(mockAnomalyNotifier.onBrokerFailure(EasyMock.isA(BrokerFailures.class)))
.andReturn(AnomalyNotificationResult.check(MOCK_DELAY_CHECK_MS));
EasyMock.expect(mockAnomalyNotifier.selfHealingEnabledRatio()).andReturn(MOCK_SELF_HEALING_ENABLED_RATIO);
Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(props);
EasyMock.expect(mockKafkaCruiseControl.config()).andReturn(kafkaCruiseControlConfig).times(1, 2);
startPeriodicDetectors(mockDetectorScheduler, mockGoalViolationDetector, mockMetricAnomalyDetector, mockDiskFailureDetector,
mockTopicAnomalyDetector, executorService);
// Schedule a delayed check
EasyMock.expect(mockDetectorScheduler.schedule(EasyMock.isA(Runnable.class),
EasyMock.eq(MOCK_DELAY_CHECK_MS),
EasyMock.eq(TimeUnit.MILLISECONDS)))
.andReturn(null);
shutdownDetector(mockDetectorScheduler, executorService);
// The following state are used to test the delayed check when executor is idle.
EasyMock.expect(mockKafkaCruiseControl.executionState()).andReturn(ExecutorState.State.NO_TASK_IN_PROGRESS);
replayCommonMocks(mockAnomalyNotifier, mockBrokerFailureDetector, mockGoalViolationDetector, mockMetricAnomalyDetector,
mockDetectorScheduler, mockKafkaCruiseControl);
AnomalyDetector anomalyDetector = new AnomalyDetector(anomalies, mockAdminClient, MOCK_ANOMALY_DETECTION_INTERVAL_MS, mockKafkaCruiseControl,
mockAnomalyNotifier, mockGoalViolationDetector, mockBrokerFailureDetector,
mockMetricAnomalyDetector, mockDiskFailureDetector, mockTopicAnomalyDetector,
mockDetectorScheduler);
try {
anomalyDetector.startDetection();
Map<String, Object> parameterConfigOverrides = new HashMap<>(4);
parameterConfigOverrides.put(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, mockKafkaCruiseControl);
parameterConfigOverrides.put(FAILED_BROKERS_OBJECT_CONFIG, Collections.singletonMap(0, 100L));
parameterConfigOverrides.put(ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, 100L);
parameterConfigOverrides.put(BROKER_FAILURES_FIXABLE_CONFIG, true);
anomalies.add(kafkaCruiseControlConfig.getConfiguredInstance(AnomalyDetectorConfig.BROKER_FAILURES_CLASS_CONFIG,
BrokerFailures.class,
parameterConfigOverrides));
while (anomalyDetector.numCheckedWithDelay() < 1) {
// Wait for the anomaly to be checked with delay before attempting to shutdown the anomaly detector.
}
anomalyDetector.shutdown();
assertEquals(0, anomalyDetector.numSelfHealingStarted());
assertEquals(1, anomalyDetector.numCheckedWithDelay());
assertTrue(executorService.awaitTermination(MOCK_ANOMALY_DETECTOR_SHUTDOWN_MS, TimeUnit.MILLISECONDS));
AnomalyDetectorState anomalyDetectorState = anomalyDetector.anomalyDetectorState();
assertEquals((long) anomalyDetectorState.metrics().get(NUM_SELF_HEALING_STARTED), 0L);
assertEquals(anomalyDetectorState.recentAnomaliesByType().get(KafkaAnomalyType.BROKER_FAILURE).size(), 1);
assertEquals(anomalyDetectorState.recentAnomaliesByType().get(KafkaAnomalyType.GOAL_VIOLATION).size(), 0);
assertEquals(anomalyDetectorState.recentAnomaliesByType().get(KafkaAnomalyType.METRIC_ANOMALY).size(), 0);
} finally {
executorService.shutdown();
}
EasyMock.verify(mockAnomalyNotifier, mockDetectorScheduler, mockKafkaCruiseControl);
}
@Test
public void testExecutionInProgress() throws InterruptedException {
PriorityBlockingQueue<Anomaly> anomalies = new PriorityBlockingQueue<>(ANOMALY_DETECTOR_INITIAL_QUEUE_SIZE,
anomalyComparator());
AdminClient mockAdminClient = EasyMock.createNiceMock(AdminClient.class);
AnomalyNotifier mockAnomalyNotifier = EasyMock.mock(AnomalyNotifier.class);
BrokerFailureDetector mockBrokerFailureDetector = EasyMock.createNiceMock(BrokerFailureDetector.class);
GoalViolationDetector mockGoalViolationDetector = EasyMock.createNiceMock(GoalViolationDetector.class);
MetricAnomalyDetector mockMetricAnomalyDetector = EasyMock.createNiceMock(MetricAnomalyDetector.class);
TopicAnomalyDetector mockTopicAnomalyDetector = EasyMock.createNiceMock(TopicAnomalyDetector.class);
DiskFailureDetector mockDiskFailureDetector = EasyMock.createNiceMock(DiskFailureDetector.class);
ScheduledExecutorService mockDetectorScheduler = EasyMock.mock(ScheduledExecutorService.class);
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
KafkaCruiseControl mockKafkaCruiseControl = EasyMock.mock(KafkaCruiseControl.class);
Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(props);
EasyMock.expect(mockKafkaCruiseControl.config()).andReturn(kafkaCruiseControlConfig).times(2);
startPeriodicDetectors(mockDetectorScheduler, mockGoalViolationDetector, mockMetricAnomalyDetector, mockDiskFailureDetector,
mockTopicAnomalyDetector, executorService);
shutdownDetector(mockDetectorScheduler, executorService);
EasyMock.expect(mockAnomalyNotifier.selfHealingEnabledRatio()).andReturn(MOCK_SELF_HEALING_ENABLED_RATIO);
// The following state are used to test the delayed check when executor is idle.
EasyMock.expect(mockKafkaCruiseControl.executionState())
.andReturn(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS);
replayCommonMocks(mockAnomalyNotifier, mockBrokerFailureDetector, mockGoalViolationDetector, mockMetricAnomalyDetector,
mockDetectorScheduler, mockKafkaCruiseControl);
AnomalyDetector anomalyDetector = new AnomalyDetector(anomalies, mockAdminClient, MOCK_ANOMALY_DETECTION_INTERVAL_MS, mockKafkaCruiseControl,
mockAnomalyNotifier, mockGoalViolationDetector, mockBrokerFailureDetector,
mockMetricAnomalyDetector, mockDiskFailureDetector, mockTopicAnomalyDetector,
mockDetectorScheduler);
try {
anomalyDetector.startDetection();
Map<String, Object> parameterConfigOverrides = new HashMap<>(2);
parameterConfigOverrides.put(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, mockKafkaCruiseControl);
parameterConfigOverrides.put(ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, 100L);
anomalies.add(kafkaCruiseControlConfig.getConfiguredInstance(AnomalyDetectorConfig.GOAL_VIOLATIONS_CLASS_CONFIG,
GoalViolations.class,
parameterConfigOverrides));
while (!anomalies.isEmpty()) {
// Just wait for the anomalies to be drained.
}
anomalyDetector.shutdown();
assertEquals(0, anomalyDetector.numSelfHealingStarted());
assertEquals(0, anomalyDetector.numCheckedWithDelay());
assertTrue(executorService.awaitTermination(MOCK_ANOMALY_DETECTOR_SHUTDOWN_MS, TimeUnit.MILLISECONDS));
AnomalyDetectorState anomalyDetectorState = anomalyDetector.anomalyDetectorState();
assertEquals((long) anomalyDetectorState.metrics().get(NUM_SELF_HEALING_STARTED), 0L);
assertEquals(anomalyDetectorState.recentAnomaliesByType().get(KafkaAnomalyType.BROKER_FAILURE).size(), 0);
assertEquals(anomalyDetectorState.recentAnomaliesByType().get(KafkaAnomalyType.GOAL_VIOLATION).size(), 1);
assertEquals(anomalyDetectorState.recentAnomaliesByType().get(KafkaAnomalyType.METRIC_ANOMALY).size(), 0);
} finally {
executorService.shutdown();
}
EasyMock.verify(mockAnomalyNotifier, mockDetectorScheduler, mockKafkaCruiseControl);
}
/**
* Check the head of the deadline queue for any queries whose deadline has
* expired.
*/
static private void checkHeadOfDeadlineQueue(final long nowNanos,
final PriorityBlockingQueue<QueryDeadline> deadlineQueue) {
QueryDeadline x;
// remove the element at the head of the queue.
while ((x = deadlineQueue.poll()) != null) {
// test for query done or deadline expired.
if (x.checkDeadline(nowNanos) == null) {
/*
* This query is known to be done. It was removed from the
* priority queue above. We need to check the next element in
* the priority order to see whether it is also done.
*/
continue;
}
if (x.deadlineNanos > nowNanos) {
/*
* This query has not yet reached its deadline. That means that
* no other query in the deadline queue has reached its
* deadline. Therefore we are done for now.
*/
// Put the query back on the deadline queue.
deadlineQueue.add(x);
break;
}
}
}
/**
* Queries with a deadline that lies significantly in the future can lie
* around in the priority queue until that deadline is reached if there are
* other queries in front of them that are not terminated and whose deadline
* has not be reached. Therefore, periodically, we need to scan the queue
* and clear out entries for terminated queries.
*/
static private void scanDeadlineQueue(final long nowNanos,
final PriorityBlockingQueue<QueryDeadline> deadlineQueue) {
final List<QueryDeadline> c = new ArrayList<QueryDeadline>(
DEADLINE_QUEUE_SCAN_SIZE);
// drain up to that many elements.
deadlineQueue.drainTo(c, DEADLINE_QUEUE_SCAN_SIZE);
int ndropped = 0, nrunning = 0;
for (QueryDeadline x : c) {
if (x.checkDeadline(nowNanos) != null) {
// return this query to the deadline queue.
deadlineQueue.add(x);
nrunning++;
} else {
ndropped++;
}
}
if (log.isInfoEnabled())
log.info("Scan: threadhold=" + DEADLINE_QUEUE_SCAN_SIZE
+ ", ndropped=" + ndropped + ", nrunning=" + nrunning
+ ", deadlineQueueSize=" + deadlineQueue.size());
}