下面列出了java.util.concurrent.LinkedBlockingQueue#drainTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected List<MemorySegment> readAllBuffers(FileIOChannel.ID id, int blockCount) throws IOException {
// we are guaranteed to stay in memory
ensureNumBuffersReturned(blockCount);
LinkedBlockingQueue<MemorySegment> retSegments = new LinkedBlockingQueue<>();
BlockChannelReader<MemorySegment> reader = FileChannelUtil.createBlockChannelReader(
ioManager, id, retSegments,
compressionEnable, compressionCodecFactory, compressionBlockSize, segmentSize);
for (int i = 0; i < blockCount; i++) {
reader.readBlock(availableMemory.remove(availableMemory.size() - 1));
}
reader.closeAndDelete();
final List<MemorySegment> buffers = new ArrayList<>();
retSegments.drainTo(buffers);
return buffers;
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
LinkedBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final LinkedBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
/**
* drainTo(c, n) empties first min(n, size) elements of queue into c
*/
public void testDrainToN() {
LinkedBlockingQueue q = new LinkedBlockingQueue();
for (int i = 0; i < SIZE + 2; ++i) {
for (int j = 0; j < SIZE; j++)
assertTrue(q.offer(new Integer(j)));
ArrayList l = new ArrayList();
q.drainTo(l, i);
int k = (i < SIZE) ? i : SIZE;
assertEquals(k, l.size());
assertEquals(SIZE - k, q.size());
for (int j = 0; j < k; ++j)
assertEquals(l.get(j), new Integer(j));
do {} while (q.poll() != null);
}
}
private void read(LinkedBlockingQueue<RowData> input) {
List<RowData> batch = new ArrayList<>();
while (true) {
batch.clear();
if (input.drainTo(batch, ROW_BATCH_SIZE) > 0) {
if (process(batch)) {
log.info("Received final packet, close output");
log.info("processed " + dnsCounter + " DNS packets.");
log.info("processed " + icmpCounter + " ICMP packets.");
return;
}
} else {
// no data from queue, sleep for a while
sleep();
}
}
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
LinkedBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final LinkedBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
/**
* drainTo(c, n) empties first min(n, size) elements of queue into c
*/
public void testDrainToN() {
LinkedBlockingQueue q = new LinkedBlockingQueue();
for (int i = 0; i < SIZE + 2; ++i) {
for (int j = 0; j < SIZE; j++)
assertTrue(q.offer(new Integer(j)));
ArrayList l = new ArrayList();
q.drainTo(l, i);
int k = (i < SIZE) ? i : SIZE;
assertEquals(k, l.size());
assertEquals(SIZE - k, q.size());
for (int j = 0; j < k; ++j)
assertEquals(l.get(j), new Integer(j));
do {} while (q.poll() != null);
}
}
protected List<MemorySegment> readAllBuffers(FileIOChannel.ID id, int blockCount) throws IOException {
// we are guaranteed to stay in memory
ensureNumBuffersReturned(blockCount);
LinkedBlockingQueue<MemorySegment> retSegments = new LinkedBlockingQueue<>();
BlockChannelReader<MemorySegment> reader = FileChannelUtil.createBlockChannelReader(
ioManager, id, retSegments,
compressionEnable, compressionCodecFactory, compressionBlockSize, segmentSize);
for (int i = 0; i < blockCount; i++) {
reader.readBlock(internalPool.nextSegment());
}
reader.closeAndDelete();
final List<MemorySegment> buffers = new ArrayList<>();
retSegments.drainTo(buffers);
return buffers;
}
@Test
public void test_BlockingQueue() {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(16);
for (int i = 0; i < 16; i++) {
queue.add("idx-" + i);
}
List<String> list = new ArrayList<>();
queue.drainTo(list, 8);
Assert.assertEquals(queue.size(), list.size());
}
@Test
public void test_BlockingQueue() {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(16);
for (int i = 0; i < 16; i++) {
queue.add("idx-" + i);
}
List<String> list = new ArrayList<>();
queue.drainTo(list, 8);
Assert.assertEquals(queue.size(), list.size());
}
private void initExecutors() {
executors = new Thread[parallelism];
for (int i = 0; i < parallelism; i++) {
final MemoryWarpScriptStack stack = new MemoryWarpScriptStack(AbstractWarp10Plugin.getExposedStoreClient(), AbstractWarp10Plugin.getExposedDirectoryClient(), new Properties());
stack.maxLimits();
final LinkedBlockingQueue<List<Object>> queue = queues[Math.min(i, queues.length - 1)];
executors[i] = new Thread() {
@Override
public void run() {
while (true) {
try {
List<List<Object>> msgs = new ArrayList<List<Object>>();
if (timeout > 0) {
List<Object> msg = queue.poll(timeout, TimeUnit.MILLISECONDS);
if (null != msg) {
msgs.add(msg);
queue.drainTo(msgs, maxMessages - 1);
}
} else {
List<Object> msg = queue.take();
msgs.add(msg);
queue.drainTo(msgs, maxMessages - 1);
}
stack.clear();
if (0 < msgs.size()) {
stack.push(msgs);
} else {
stack.push(null);
}
stack.exec(macro);
} catch (InterruptedException e) {
return;
} catch (WarpScriptStopException wsse) {
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
executors[i].setName("[TCP Executor on port " + port + " #" + i + "]");
executors[i].setDaemon(true);
executors[i].start();
}
}