下面列出了java.util.concurrent.ArrayBlockingQueue#drainTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
ArrayBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final ArrayBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
/**
* drainTo(c, n) empties first min(n, size) elements of queue into c
*/
public void testDrainToN() {
ArrayBlockingQueue q = new ArrayBlockingQueue(SIZE * 2);
for (int i = 0; i < SIZE + 2; ++i) {
for (int j = 0; j < SIZE; j++)
assertTrue(q.offer(new Integer(j)));
ArrayList l = new ArrayList();
q.drainTo(l, i);
int k = (i < SIZE) ? i : SIZE;
assertEquals(k, l.size());
assertEquals(SIZE - k, q.size());
for (int j = 0; j < k; ++j)
assertEquals(l.get(j), new Integer(j));
do {} while (q.poll() != null);
}
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
ArrayBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final ArrayBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
/**
* drainTo(c, n) empties first min(n, size) elements of queue into c
*/
public void testDrainToN() {
ArrayBlockingQueue q = new ArrayBlockingQueue(SIZE * 2);
for (int i = 0; i < SIZE + 2; ++i) {
for (int j = 0; j < SIZE; j++)
assertTrue(q.offer(new Integer(j)));
ArrayList l = new ArrayList();
q.drainTo(l, i);
int k = (i < SIZE) ? i : SIZE;
assertEquals(k, l.size());
assertEquals(SIZE - k, q.size());
for (int j = 0; j < k; ++j)
assertEquals(l.get(j), new Integer(j));
do {} while (q.poll() != null);
}
}
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
queue.add(1);
queue.add(2);
queue.add(3);
System.out.println("Before drainTo Operation");
System.out.println("queue = " + queue);
ArrayList<Integer> list = new ArrayList();
queue.drainTo(list);
System.out.println("After drainTo Operation");
System.out.println("queue = " + queue);
System.out.println("collection = " + list);
}
@Override
public List<EncodedReport> encode() {
List<EncodedReport> encodedReports = new ArrayList<>(reportBuckets.size());
// For each bucket
for (ArrayBlockingQueue<T> bucket : reportBuckets.values()) {
if (bucket.isEmpty()) {
continue;
}
// Drain TODO Small chance of brief blocking; can rework easily if this becomes a problem.
List<BatchedReportData> reports = new ArrayList<>(bucket.size());
bucket.drainTo(reports);
encodedReports.add(new ReportToSendImpl(options.getReportEndpoint(), reports, flushHandler));
}
return encodedReports;
}
private <T> List<T> pollFromQueue(ArrayBlockingQueue<T> queue) {
int capacity = Math.min(MAX_CACHE, queue.size());
List<T> list = new ArrayList<>(capacity);
queue.drainTo(list, capacity);
return list;
}
/**
* Tests that a SlotPoolImpl shutdown releases all registered slots.
*/
@Test
public void testShutdownReleasesAllSlots() throws Exception {
try (SlotPoolImpl slotPool = new SlotPoolImpl(jobId)) {
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
final int numSlotOffers = 2;
final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);
for (int i = 0; i < numSlotOffers; i++) {
slotOffers.add(
new SlotOffer(
new AllocationID(),
i,
ResourceProfile.UNKNOWN));
}
final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);
taskManagerGateway.setFreeSlotFunction(
(AllocationID allocationID, Throwable cause) -> {
try {
freedSlotQueue.put(allocationID);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (InterruptedException e) {
return FutureUtils.completedExceptionally(e);
}
});
final Collection<SlotOffer> acceptedSlotOffers = slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));
// shut down the slot pool
slotPool.close();
// the shut down operation should have freed all registered slots
ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);
while (freedSlots.size() < numSlotOffers) {
freedSlotQueue.drainTo(freedSlots);
}
assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
}
}
/**
* Tests that a SlotPoolImpl shutdown releases all registered slots.
*/
@Test
public void testShutdownReleasesAllSlots() throws Exception {
try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
final int numSlotOffers = 2;
final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);
for (int i = 0; i < numSlotOffers; i++) {
slotOffers.add(
new SlotOffer(
new AllocationID(),
i,
ResourceProfile.UNKNOWN));
}
final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);
taskManagerGateway.setFreeSlotFunction(
(AllocationID allocationID, Throwable cause) -> {
try {
freedSlotQueue.put(allocationID);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (InterruptedException e) {
return FutureUtils.completedExceptionally(e);
}
});
final Collection<SlotOffer> acceptedSlotOffers = slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));
// shut down the slot pool
slotPool.close();
// the shut down operation should have freed all registered slots
ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);
while (freedSlots.size() < numSlotOffers) {
freedSlotQueue.drainTo(freedSlots);
}
assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
}
}
private <T> List<T> pollFromQueue(ArrayBlockingQueue<T> queue) {
int capacity = Math.min(MAX_CACHE, queue.size());
List<T> list = new ArrayList<>(capacity);
queue.drainTo(list, capacity);
return list;
}
/**
* 39469
* @throws InterruptedException
*/
@Test
public void testABQ() throws InterruptedException {
final ArrayBlockingQueue queue = new ArrayBlockingQueue(5000);
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
long startTime = -1;
int dataCounter = 0;
while (true) {
ArrayList data = new ArrayList();
queue.drainTo(data);
if (startTime == -1 && data.size() > 0) {
startTime = System.currentTimeMillis();
}
dataCounter += data.size();
if (dataCounter == totalSize) {
break;
}
}
System.out.println("time cost:" + (System.currentTimeMillis() - startTime));
}
});
consumer.start();
for (int i = 0; i < totalSize; i++) {
boolean status = false;
while (!status) {
try {
queue.add(i);
status = true;
} catch (Exception e) {
}
}
}
consumer.join();
}
/**
* Tests that a SlotPoolImpl shutdown releases all registered slots.
*/
@Test
public void testShutdownReleasesAllSlots() throws Exception {
try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
final int numSlotOffers = 2;
final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);
for (int i = 0; i < numSlotOffers; i++) {
slotOffers.add(
new SlotOffer(
new AllocationID(),
i,
ResourceProfile.ANY));
}
final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);
taskManagerGateway.setFreeSlotFunction(
(AllocationID allocationID, Throwable cause) -> {
try {
freedSlotQueue.put(allocationID);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (InterruptedException e) {
return FutureUtils.completedExceptionally(e);
}
});
final Collection<SlotOffer> acceptedSlotOffers = slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));
// shut down the slot pool
slotPool.close();
// the shut down operation should have freed all registered slots
ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);
while (freedSlots.size() < numSlotOffers) {
freedSlotQueue.drainTo(freedSlots);
}
assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
}
}