drainTo ( )源码实例Demo

java.util.concurrent.LinkedBlockingQueue#drainTo ( )源码实例Demo

下面列出了java.util.concurrent.LinkedBlockingQueue#drainTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: flink   文件: BaseHybridHashTable.java
protected List<MemorySegment> readAllBuffers(FileIOChannel.ID id, int blockCount) throws IOException {
	// we are guaranteed to stay in memory
	ensureNumBuffersReturned(blockCount);

	LinkedBlockingQueue<MemorySegment> retSegments = new LinkedBlockingQueue<>();
	BlockChannelReader<MemorySegment> reader = FileChannelUtil.createBlockChannelReader(
			ioManager, id, retSegments,
			compressionEnable, compressionCodecFactory, compressionBlockSize, segmentSize);
	for (int i = 0; i < blockCount; i++) {
		reader.readBlock(availableMemory.remove(availableMemory.size() - 1));
	}
	reader.closeAndDelete();

	final List<MemorySegment> buffers = new ArrayList<>();
	retSegments.drainTo(buffers);
	return buffers;
}
 
源代码2 项目: openjdk-jdk9   文件: LinkedBlockingQueueTest.java
/**
 * drainTo(c) empties queue into another collection c
 */
public void testDrainTo() {
    LinkedBlockingQueue q = populatedQueue(SIZE);
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(SIZE, l.size());
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    q.add(zero);
    q.add(one);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(zero));
    assertTrue(q.contains(one));
    l.clear();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(2, l.size());
    for (int i = 0; i < 2; ++i)
        assertEquals(l.get(i), new Integer(i));
}
 
源代码3 项目: openjdk-jdk9   文件: LinkedBlockingQueueTest.java
/**
 * drainTo empties full queue, unblocking a waiting put.
 */
public void testDrainToWithActivePut() throws InterruptedException {
    final LinkedBlockingQueue q = populatedQueue(SIZE);
    Thread t = new Thread(new CheckedRunnable() {
        public void realRun() throws InterruptedException {
            q.put(new Integer(SIZE + 1));
        }});

    t.start();
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertTrue(l.size() >= SIZE);
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    t.join();
    assertTrue(q.size() + l.size() >= SIZE);
}
 
源代码4 项目: openjdk-jdk9   文件: LinkedBlockingQueueTest.java
/**
 * drainTo(c, n) empties first min(n, size) elements of queue into c
 */
public void testDrainToN() {
    LinkedBlockingQueue q = new LinkedBlockingQueue();
    for (int i = 0; i < SIZE + 2; ++i) {
        for (int j = 0; j < SIZE; j++)
            assertTrue(q.offer(new Integer(j)));
        ArrayList l = new ArrayList();
        q.drainTo(l, i);
        int k = (i < SIZE) ? i : SIZE;
        assertEquals(k, l.size());
        assertEquals(SIZE - k, q.size());
        for (int j = 0; j < k; ++j)
            assertEquals(l.get(j), new Integer(j));
        do {} while (q.poll() != null);
    }
}
 
源代码5 项目: entrada   文件: OutputWriterImpl.java
private void read(LinkedBlockingQueue<RowData> input) {
  List<RowData> batch = new ArrayList<>();
  while (true) {
    batch.clear();
    if (input.drainTo(batch, ROW_BATCH_SIZE) > 0) {
      if (process(batch)) {
        log.info("Received final packet, close output");
        log.info("processed " + dnsCounter + " DNS packets.");
        log.info("processed " + icmpCounter + " ICMP packets.");

        return;
      }
    } else {
      // no data from queue, sleep for a while
      sleep();
    }
  }
}
 
源代码6 项目: j2objc   文件: LinkedBlockingQueueTest.java
/**
 * drainTo(c) empties queue into another collection c
 */
public void testDrainTo() {
    LinkedBlockingQueue q = populatedQueue(SIZE);
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(SIZE, l.size());
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    q.add(zero);
    q.add(one);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(zero));
    assertTrue(q.contains(one));
    l.clear();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(2, l.size());
    for (int i = 0; i < 2; ++i)
        assertEquals(l.get(i), new Integer(i));
}
 
源代码7 项目: j2objc   文件: LinkedBlockingQueueTest.java
/**
 * drainTo empties full queue, unblocking a waiting put.
 */
public void testDrainToWithActivePut() throws InterruptedException {
    final LinkedBlockingQueue q = populatedQueue(SIZE);
    Thread t = new Thread(new CheckedRunnable() {
        public void realRun() throws InterruptedException {
            q.put(new Integer(SIZE + 1));
        }});

    t.start();
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertTrue(l.size() >= SIZE);
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    t.join();
    assertTrue(q.size() + l.size() >= SIZE);
}
 
源代码8 项目: j2objc   文件: LinkedBlockingQueueTest.java
/**
 * drainTo(c, n) empties first min(n, size) elements of queue into c
 */
public void testDrainToN() {
    LinkedBlockingQueue q = new LinkedBlockingQueue();
    for (int i = 0; i < SIZE + 2; ++i) {
        for (int j = 0; j < SIZE; j++)
            assertTrue(q.offer(new Integer(j)));
        ArrayList l = new ArrayList();
        q.drainTo(l, i);
        int k = (i < SIZE) ? i : SIZE;
        assertEquals(k, l.size());
        assertEquals(SIZE - k, q.size());
        for (int j = 0; j < k; ++j)
            assertEquals(l.get(j), new Integer(j));
        do {} while (q.poll() != null);
    }
}
 
源代码9 项目: flink   文件: BaseHybridHashTable.java
protected List<MemorySegment> readAllBuffers(FileIOChannel.ID id, int blockCount) throws IOException {
	// we are guaranteed to stay in memory
	ensureNumBuffersReturned(blockCount);

	LinkedBlockingQueue<MemorySegment> retSegments = new LinkedBlockingQueue<>();
	BlockChannelReader<MemorySegment> reader = FileChannelUtil.createBlockChannelReader(
			ioManager, id, retSegments,
			compressionEnable, compressionCodecFactory, compressionBlockSize, segmentSize);
	for (int i = 0; i < blockCount; i++) {
		reader.readBlock(internalPool.nextSegment());
	}
	reader.closeAndDelete();

	final List<MemorySegment> buffers = new ArrayList<>();
	retSegments.drainTo(buffers);
	return buffers;
}
 
源代码10 项目: tunnel   文件: EsPublisherTest.java
@Test
public void test_BlockingQueue() {
    LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(16);
    for (int i = 0; i < 16; i++) {
        queue.add("idx-" + i);
    }
    List<String> list = new ArrayList<>();
    queue.drainTo(list, 8);
    Assert.assertEquals(queue.size(), list.size());

}
 
源代码11 项目: tunnel   文件: EsPublisherTest.java
@Test
public void test_BlockingQueue() {
    LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(16);
    for (int i = 0; i < 16; i++) {
        queue.add("idx-" + i);
    }
    List<String> list = new ArrayList<>();
    queue.drainTo(list, 8);
    Assert.assertEquals(queue.size(), list.size());

}
 
源代码12 项目: warp10-platform   文件: TCPManager.java
private void initExecutors() {

    executors = new Thread[parallelism];

    for (int i = 0; i < parallelism; i++) {

      final MemoryWarpScriptStack stack = new MemoryWarpScriptStack(AbstractWarp10Plugin.getExposedStoreClient(), AbstractWarp10Plugin.getExposedDirectoryClient(), new Properties());
      stack.maxLimits();

      final LinkedBlockingQueue<List<Object>> queue = queues[Math.min(i, queues.length - 1)];

      executors[i] = new Thread() {
        @Override
        public void run() {
          while (true) {

            try {
              List<List<Object>> msgs = new ArrayList<List<Object>>();

              if (timeout > 0) {
                List<Object> msg = queue.poll(timeout, TimeUnit.MILLISECONDS);
                if (null != msg) {
                  msgs.add(msg);
                  queue.drainTo(msgs, maxMessages - 1);
                }
              } else {
                List<Object> msg = queue.take();
                msgs.add(msg);
                queue.drainTo(msgs, maxMessages - 1);
              }

              stack.clear();

              if (0 < msgs.size()) {
                stack.push(msgs);
              } else {
                stack.push(null);
              }

              stack.exec(macro);
            } catch (InterruptedException e) {
              return;
            } catch (WarpScriptStopException wsse) {
            } catch (Exception e) {
              e.printStackTrace();
            }
          }
        }
      };

      executors[i].setName("[TCP Executor on port " + port + " #" + i + "]");
      executors[i].setDaemon(true);
      executors[i].start();
    }
  }