java.util.concurrent.ArrayBlockingQueue#drainTo ( )源码实例Demo

下面列出了java.util.concurrent.ArrayBlockingQueue#drainTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: openjdk-jdk9   文件: ArrayBlockingQueueTest.java
/**
 * drainTo(c) empties queue into another collection c
 */
public void testDrainTo() {
    ArrayBlockingQueue q = populatedQueue(SIZE);
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(SIZE, l.size());
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    q.add(zero);
    q.add(one);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(zero));
    assertTrue(q.contains(one));
    l.clear();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(2, l.size());
    for (int i = 0; i < 2; ++i)
        assertEquals(l.get(i), new Integer(i));
}
 
源代码2 项目: openjdk-jdk9   文件: ArrayBlockingQueueTest.java
/**
 * drainTo empties full queue, unblocking a waiting put.
 */
public void testDrainToWithActivePut() throws InterruptedException {
    final ArrayBlockingQueue q = populatedQueue(SIZE);
    Thread t = new Thread(new CheckedRunnable() {
        public void realRun() throws InterruptedException {
            q.put(new Integer(SIZE + 1));
        }});

    t.start();
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertTrue(l.size() >= SIZE);
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    t.join();
    assertTrue(q.size() + l.size() >= SIZE);
}
 
源代码3 项目: openjdk-jdk9   文件: ArrayBlockingQueueTest.java
/**
 * drainTo(c, n) empties first min(n, size) elements of queue into c
 */
public void testDrainToN() {
    ArrayBlockingQueue q = new ArrayBlockingQueue(SIZE * 2);
    for (int i = 0; i < SIZE + 2; ++i) {
        for (int j = 0; j < SIZE; j++)
            assertTrue(q.offer(new Integer(j)));
        ArrayList l = new ArrayList();
        q.drainTo(l, i);
        int k = (i < SIZE) ? i : SIZE;
        assertEquals(k, l.size());
        assertEquals(SIZE - k, q.size());
        for (int j = 0; j < k; ++j)
            assertEquals(l.get(j), new Integer(j));
        do {} while (q.poll() != null);
    }
}
 
源代码4 项目: j2objc   文件: ArrayBlockingQueueTest.java
/**
 * drainTo(c) empties queue into another collection c
 */
public void testDrainTo() {
    ArrayBlockingQueue q = populatedQueue(SIZE);
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(SIZE, l.size());
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    q.add(zero);
    q.add(one);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(zero));
    assertTrue(q.contains(one));
    l.clear();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(2, l.size());
    for (int i = 0; i < 2; ++i)
        assertEquals(l.get(i), new Integer(i));
}
 
源代码5 项目: j2objc   文件: ArrayBlockingQueueTest.java
/**
 * drainTo empties full queue, unblocking a waiting put.
 */
public void testDrainToWithActivePut() throws InterruptedException {
    final ArrayBlockingQueue q = populatedQueue(SIZE);
    Thread t = new Thread(new CheckedRunnable() {
        public void realRun() throws InterruptedException {
            q.put(new Integer(SIZE + 1));
        }});

    t.start();
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertTrue(l.size() >= SIZE);
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    t.join();
    assertTrue(q.size() + l.size() >= SIZE);
}
 
源代码6 项目: j2objc   文件: ArrayBlockingQueueTest.java
/**
 * drainTo(c, n) empties first min(n, size) elements of queue into c
 */
public void testDrainToN() {
    ArrayBlockingQueue q = new ArrayBlockingQueue(SIZE * 2);
    for (int i = 0; i < SIZE + 2; ++i) {
        for (int j = 0; j < SIZE; j++)
            assertTrue(q.offer(new Integer(j)));
        ArrayList l = new ArrayList();
        q.drainTo(l, i);
        int k = (i < SIZE) ? i : SIZE;
        assertEquals(k, l.size());
        assertEquals(SIZE - k, q.size());
        for (int j = 0; j < k; ++j)
            assertEquals(l.get(j), new Integer(j));
        do {} while (q.poll() != null);
    }
}
 
public static void main(String[] args) throws InterruptedException {
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
    queue.add(1);
    queue.add(2);
    queue.add(3);
    System.out.println("Before drainTo Operation");
    System.out.println("queue = " + queue);

    ArrayList<Integer> list = new ArrayList();

    queue.drainTo(list);
    System.out.println("After drainTo Operation");
    System.out.println("queue = " + queue);
    System.out.println("collection = " + list);
}
 
源代码8 项目: apiman-plugins   文件: ReporterImpl.java
@Override
public List<EncodedReport> encode() {
    List<EncodedReport> encodedReports = new ArrayList<>(reportBuckets.size());
    // For each bucket
    for (ArrayBlockingQueue<T> bucket : reportBuckets.values()) {
        if (bucket.isEmpty()) {
            continue;
        }
        // Drain TODO Small chance of brief blocking; can rework easily if this becomes a problem.
        List<BatchedReportData> reports = new ArrayList<>(bucket.size());
        bucket.drainTo(reports);
        encodedReports.add(new ReportToSendImpl(options.getReportEndpoint(), reports, flushHandler));
    }
    return encodedReports;
}
 
源代码9 项目: tunnel   文件: HBaseClient.java
private <T> List<T> pollFromQueue(ArrayBlockingQueue<T> queue) {
    int capacity = Math.min(MAX_CACHE, queue.size());
    List<T> list = new ArrayList<>(capacity);
    queue.drainTo(list, capacity);
    return list;
}
 
源代码10 项目: Flink-CEPplus   文件: SlotPoolImplTest.java
/**
 * Tests that a SlotPoolImpl shutdown releases all registered slots.
 */
@Test
public void testShutdownReleasesAllSlots() throws Exception {

	try (SlotPoolImpl slotPool = new SlotPoolImpl(jobId)) {
		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());

		final int numSlotOffers = 2;

		final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);

		for (int i = 0; i < numSlotOffers; i++) {
			slotOffers.add(
				new SlotOffer(
					new AllocationID(),
					i,
					ResourceProfile.UNKNOWN));
		}

		final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);

		taskManagerGateway.setFreeSlotFunction(
			(AllocationID allocationID, Throwable cause) -> {
				try {
					freedSlotQueue.put(allocationID);
					return CompletableFuture.completedFuture(Acknowledge.get());
				} catch (InterruptedException e) {
					return FutureUtils.completedExceptionally(e);
				}
			});

		final Collection<SlotOffer> acceptedSlotOffers = slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);

		assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));

		// shut down the slot pool
		slotPool.close();

		// the shut down operation should have freed all registered slots
		ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);

		while (freedSlots.size() < numSlotOffers) {
			freedSlotQueue.drainTo(freedSlots);
		}

		assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
	}
}
 
源代码11 项目: flink   文件: SlotPoolImplTest.java
/**
 * Tests that a SlotPoolImpl shutdown releases all registered slots.
 */
@Test
public void testShutdownReleasesAllSlots() throws Exception {

	try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());

		final int numSlotOffers = 2;

		final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);

		for (int i = 0; i < numSlotOffers; i++) {
			slotOffers.add(
				new SlotOffer(
					new AllocationID(),
					i,
					ResourceProfile.UNKNOWN));
		}

		final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);

		taskManagerGateway.setFreeSlotFunction(
			(AllocationID allocationID, Throwable cause) -> {
				try {
					freedSlotQueue.put(allocationID);
					return CompletableFuture.completedFuture(Acknowledge.get());
				} catch (InterruptedException e) {
					return FutureUtils.completedExceptionally(e);
				}
			});

		final Collection<SlotOffer> acceptedSlotOffers = slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);

		assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));

		// shut down the slot pool
		slotPool.close();

		// the shut down operation should have freed all registered slots
		ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);

		while (freedSlots.size() < numSlotOffers) {
			freedSlotQueue.drainTo(freedSlots);
		}

		assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
	}
}
 
源代码12 项目: tunnel   文件: HBaseClient.java
private <T> List<T> pollFromQueue(ArrayBlockingQueue<T> queue) {
    int capacity = Math.min(MAX_CACHE, queue.size());
    List<T> list = new ArrayList<>(capacity);
    queue.drainTo(list, capacity);
    return list;
}
 
源代码13 项目: DataCarrier   文件: VsABQ.java
/**
 * 39469
 * @throws InterruptedException
 */
@Test
public void testABQ() throws InterruptedException {
    final ArrayBlockingQueue queue = new ArrayBlockingQueue(5000);

    Thread consumer = new Thread(new Runnable() {
        @Override
        public void run() {
            long startTime = -1;
            int dataCounter = 0;

            while (true) {
                ArrayList data = new ArrayList();
                queue.drainTo(data);
                if (startTime == -1 && data.size() > 0) {
                    startTime = System.currentTimeMillis();
                }

                dataCounter += data.size();
                if (dataCounter == totalSize) {
                    break;
                }
            }

            System.out.println("time cost:" + (System.currentTimeMillis() - startTime));
        }
    });
    consumer.start();

    for (int i = 0; i < totalSize; i++) {
        boolean status = false;
        while (!status) {
            try {
                queue.add(i);
                status = true;
            } catch (Exception e) {
            }
        }
    }

    consumer.join();
}
 
源代码14 项目: flink   文件: SlotPoolImplTest.java
/**
 * Tests that a SlotPoolImpl shutdown releases all registered slots.
 */
@Test
public void testShutdownReleasesAllSlots() throws Exception {

	try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());

		final int numSlotOffers = 2;

		final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);

		for (int i = 0; i < numSlotOffers; i++) {
			slotOffers.add(
				new SlotOffer(
					new AllocationID(),
					i,
					ResourceProfile.ANY));
		}

		final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);

		taskManagerGateway.setFreeSlotFunction(
			(AllocationID allocationID, Throwable cause) -> {
				try {
					freedSlotQueue.put(allocationID);
					return CompletableFuture.completedFuture(Acknowledge.get());
				} catch (InterruptedException e) {
					return FutureUtils.completedExceptionally(e);
				}
			});

		final Collection<SlotOffer> acceptedSlotOffers = slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);

		assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));

		// shut down the slot pool
		slotPool.close();

		// the shut down operation should have freed all registered slots
		ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);

		while (freedSlots.size() < numSlotOffers) {
			freedSlotQueue.drainTo(freedSlots);
		}

		assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
	}
}