下面列出了java.util.concurrent.ArrayBlockingQueue#poll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* drainTo(c, n) empties first min(n, size) elements of queue into c
*/
public void testDrainToN() {
ArrayBlockingQueue q = new ArrayBlockingQueue(SIZE * 2);
for (int i = 0; i < SIZE + 2; ++i) {
for (int j = 0; j < SIZE; j++)
assertTrue(q.offer(new Integer(j)));
ArrayList l = new ArrayList();
q.drainTo(l, i);
int k = (i < SIZE) ? i : SIZE;
assertEquals(k, l.size());
assertEquals(SIZE - k, q.size());
for (int j = 0; j < k; ++j)
assertEquals(l.get(j), new Integer(j));
do {} while (q.poll() != null);
}
}
@Override
public void run() {
try {
final ByteArrayDiskQueue receivedURLs = frontier.receivedURLs;
final ArrayBlockingQueue<ByteArrayList> quickReceivedURLs = frontier.quickReceivedURLs;
while(! stop) {
final ByteArrayList list = quickReceivedURLs.poll(1, TimeUnit.SECONDS);
if (list != null) receivedURLs.enqueue(list.elements(), 0, list.size());
}
}
catch (Throwable t) {
LOGGER.error("Unexpected exception ", t);
}
LOGGER.info("Completed");
}
/**
* drainTo(c, n) empties first min(n, size) elements of queue into c
*/
public void testDrainToN() {
ArrayBlockingQueue q = new ArrayBlockingQueue(SIZE * 2);
for (int i = 0; i < SIZE + 2; ++i) {
for (int j = 0; j < SIZE; j++)
assertTrue(q.offer(new Integer(j)));
ArrayList l = new ArrayList();
q.drainTo(l, i);
int k = (i < SIZE) ? i : SIZE;
assertEquals(k, l.size());
assertEquals(SIZE - k, q.size());
for (int j = 0; j < k; ++j)
assertEquals(l.get(j), new Integer(j));
do {} while (q.poll() != null);
}
}
@Test
public void testIDGrabbing() throws Exception {
SearchClient searchClient = SearchClientFactory.getClient(TMDB_URL);
final ArrayBlockingQueue<Set<String>> ids = new ArrayBlockingQueue<>(10);
IdGrabber grabber = searchClient.getIdGrabber(ids, 1000, 1, Collections.EMPTY_SET);
Thread producer = new Thread(new FutureTask(grabber));
producer.start();
final AtomicInteger idCounter = new AtomicInteger(0);
Thread consumer = new Thread() {
@Override
public void run() {
while (true) {
Set<String> set = null;
try {
set = ids.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//swallow
}
if (set != null) {
idCounter.addAndGet(set.size());
}
if (set != null && set.size() == 0) {
break;
}
}
}
};
consumer.start();
producer.join();
consumer.join();
assertEquals(27846, idCounter.get());
}
public void clear_willClearItrs() {
boolean fair = rnd.nextBoolean();
int capacity = rnd.nextInt(2, 10);
ArrayBlockingQueue q = new ArrayBlockingQueue(capacity, fair);
randomizePutIndex(q);
List<Iterator> its = new ArrayList<>();
for (int i = 0; i < capacity; i++)
assertTrue(q.add(i));
assertNull(itrs(q));
for (int i = 0; i < capacity; i++) {
its.add(q.iterator());
assertEquals(trackedIterators(q), its);
q.poll();
q.add(capacity + i);
}
q.clear();
assertNull(itrs(q));
int j = 0;
for (Iterator it : its) {
assertTrue(isDetached(it));
if (rnd.nextBoolean()) assertTrue(it.hasNext());
if (rnd.nextBoolean()) {
assertEquals(it.next(), j);
assertIteratorExhausted(it);
}
j++;
}
}
public void queueEmptyingWillClearItrs() {
boolean fair = rnd.nextBoolean();
int capacity = rnd.nextInt(2, 10);
ArrayBlockingQueue q = new ArrayBlockingQueue(capacity, fair);
randomizePutIndex(q);
List<Iterator> its = new ArrayList<>();
for (int i = 0; i < capacity; i++)
q.add(i);
assertNull(itrs(q));
for (int i = 0; i < capacity; i++) {
its.add(q.iterator());
assertEquals(trackedIterators(q), its);
q.poll();
q.add(capacity+i);
}
for (int i = 0; i < capacity; i++)
q.poll();
assertNull(itrs(q));
int j = 0;
for (Iterator it : its) {
assertTrue(isDetached(it));
if (rnd.nextBoolean()) assertTrue(it.hasNext());
if (rnd.nextBoolean()) {
assertEquals(it.next(), j);
assertIteratorExhausted(it);
}
j++;
}
}
public void advancing2CyclesWillRemoveIterators() {
boolean fair = rnd.nextBoolean();
int capacity = rnd.nextInt(2, 10);
ArrayBlockingQueue q = new ArrayBlockingQueue(capacity, fair);
List<Iterator> its = new ArrayList<>();
for (int i = 0; i < capacity; i++)
q.add(i);
assertNull(itrs(q));
for (int i = capacity; i < 3 * capacity; i++) {
its.add(q.iterator());
assertEquals(trackedIterators(q), its);
q.poll();
q.add(i);
}
for (int i = 3 * capacity; i < 4 * capacity; i++) {
assertEquals(trackedIterators(q), its.subList(capacity,2*capacity));
q.poll();
q.add(i);
}
assertNull(itrs(q));
int j = 0;
for (Iterator it : its) {
assertTrue(isDetached(it));
if (rnd.nextBoolean()) assertTrue(it.hasNext());
if (rnd.nextBoolean()) {
assertEquals(it.next(), j);
assertIteratorExhausted(it);
}
j++;
}
}
@Test
public void pingPong() throws Exception {
final List<StreamingOutputCallRequest> requests = Arrays.asList(
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(31415))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[27182])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(9))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[8])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(2653))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[1828])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(58979))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[45904])))
.build());
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[31415])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[9])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[2653])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[58979])))
.build());
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(5);
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
@Override
public void onNext(StreamingOutputCallResponse response) {
queue.add(response);
}
@Override
public void onError(Throwable t) {
queue.add(t);
}
@Override
public void onCompleted() {
queue.add("Completed");
}
});
for (int i = 0; i < requests.size(); i++) {
assertNull(queue.peek());
requestObserver.onNext(requests.get(i));
Object actualResponse = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
assertNotNull("Timed out waiting for response", actualResponse);
if (actualResponse instanceof Throwable) {
throw new AssertionError(actualResponse);
}
assertResponse(goldenResponses.get(i), (StreamingOutputCallResponse) actualResponse);
}
requestObserver.onCompleted();
assertEquals("Completed", queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
}
@Test
public void serverStreamingShouldBeFlowControlled() throws Exception {
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(100000))
.addResponseParameters(ResponseParameters.newBuilder().setSize(100001))
.build();
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[100000]))).build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[100001]))).build());
long start = System.nanoTime();
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
channel.newCall(TestServiceGrpc.getStreamingOutputCallMethod(), CallOptions.DEFAULT);
call.start(new ClientCall.Listener<StreamingOutputCallResponse>() {
@Override
public void onHeaders(Metadata headers) {}
@Override
public void onMessage(final StreamingOutputCallResponse message) {
queue.add(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
queue.add(status);
}
}, new Metadata());
call.sendMessage(request);
call.halfClose();
// Time how long it takes to get the first response.
call.request(1);
Object response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof StreamingOutputCallResponse);
assertResponse(goldenResponses.get(0), (StreamingOutputCallResponse) response);
long firstCallDuration = System.nanoTime() - start;
// Without giving additional flow control, make sure that we don't get another response. We wait
// until we are comfortable the next message isn't coming. We may have very low nanoTime
// resolution (like on Windows) or be using a testing, in-process transport where message
// handling is instantaneous. In both cases, firstCallDuration may be 0, so round up sleep time
// to at least 1ms.
assertNull(queue.poll(Math.max(firstCallDuration * 4, 1 * 1000 * 1000), TimeUnit.NANOSECONDS));
// Make sure that everything still completes.
call.request(1);
response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof StreamingOutputCallResponse);
assertResponse(goldenResponses.get(1), (StreamingOutputCallResponse) response);
assertEquals(Status.OK, queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
}
/**
* Verify that onExit completes for a non-child process only when
* the process has exited.
* Spawn a child (A) waiting to be commanded to exit.
* Spawn a child (B) to wait for that process to exit.
* Command (A) to exit.
* Check that (B) does not complete until (A) has exited.
*/
@Test
public static void peerOnExitTest() {
String line = null;
ArrayBlockingQueue<String> alines = new ArrayBlockingQueue<>(100);
ArrayBlockingQueue<String> blines = new ArrayBlockingQueue<>(100);
JavaChild A = null;
try {
String[] split;
A = JavaChild.spawnJavaChild("stdin");
A.forEachOutputLine(l -> alines.add(l));
// Verify A is running
A.sendAction("pid");
do {
split = getSplitLine(alines);
} while (!"pid".equals(split[1]));
JavaChild B = null;
try {
B = JavaChild.spawnJavaChild("stdin");
B.forEachOutputLine(l -> blines.add(l));
// Verify B is running
B.sendAction("pid");
do {
split = getSplitLine(blines);
} while (!"pid".equals(split[1]));
// Tell B to wait for A's pid
B.sendAction("waitpid", A.pid());
// Wait a bit to see if B will prematurely report the termination of A
try {
line = blines.poll(5L, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
Assert.fail("interrupted", ie);
}
Assert.assertNull(line, "waitpid didn't wait");
A.sendAction("exit", 0L);
// Look for B to report that A has exited
do {
split = getSplitLine(blines);
} while (!"waitpid".equals(split[1]));
Assert.assertEquals(split[2], "false", "Process A should not be alive");
B.sendAction("exit", 0L);
} catch (IOException ioe) {
Assert.fail("unable to start JavaChild B", ioe);
} finally {
B.destroyForcibly();
}
} catch (IOException ioe2) {
Assert.fail("unable to start JavaChild A", ioe2);
} finally {
A.destroyForcibly();
}
}
@Test
public void pingPong() throws Exception {
final List<StreamingOutputCallRequest> requests = Arrays.asList(
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(31415))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[27182])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(9))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[8])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(2653))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[1828])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(58979))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[45904])))
.build());
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[31415])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[9])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[2653])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[58979])))
.build());
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(5);
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
@Override
public void onNext(StreamingOutputCallResponse response) {
queue.add(response);
}
@Override
public void onError(Throwable t) {
queue.add(t);
}
@Override
public void onCompleted() {
queue.add("Completed");
}
});
for (int i = 0; i < requests.size(); i++) {
assertNull(queue.peek());
requestObserver.onNext(requests.get(i));
Object actualResponse = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
assertNotNull("Timed out waiting for response", actualResponse);
if (actualResponse instanceof Throwable) {
throw new AssertionError(actualResponse);
}
assertResponse(goldenResponses.get(i), (StreamingOutputCallResponse) actualResponse);
}
requestObserver.onCompleted();
assertEquals("Completed", queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
}
@Test
public void serverStreamingShouldBeFlowControlled() throws Exception {
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder().setSize(100000))
.addResponseParameters(ResponseParameters.newBuilder().setSize(100001))
.build();
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[100000]))).build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[100001]))).build());
long start = System.nanoTime();
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
channel.newCall(TestServiceGrpc.getStreamingOutputCallMethod(), CallOptions.DEFAULT);
call.start(new ClientCall.Listener<StreamingOutputCallResponse>() {
@Override
public void onHeaders(Metadata headers) {}
@Override
public void onMessage(final StreamingOutputCallResponse message) {
queue.add(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
queue.add(status);
}
}, new Metadata());
call.sendMessage(request);
call.halfClose();
// Time how long it takes to get the first response.
call.request(1);
Object response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof StreamingOutputCallResponse);
assertResponse(goldenResponses.get(0), (StreamingOutputCallResponse) response);
long firstCallDuration = System.nanoTime() - start;
// Without giving additional flow control, make sure that we don't get another response. We wait
// until we are comfortable the next message isn't coming. We may have very low nanoTime
// resolution (like on Windows) or be using a testing, in-process transport where message
// handling is instantaneous. In both cases, firstCallDuration may be 0, so round up sleep time
// to at least 1ms.
assertNull(queue.poll(Math.max(firstCallDuration * 4, 1 * 1000 * 1000), TimeUnit.NANOSECONDS));
// Make sure that everything still completes.
call.request(1);
response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof StreamingOutputCallResponse);
assertResponse(goldenResponses.get(1), (StreamingOutputCallResponse) response);
assertEquals(Status.OK, queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
}