下面列出了java.util.concurrent.BlockingQueue#poll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testSendAndReceiveNoPayload() throws Exception
{
// Setup event source
String nodeId = "TestNode" + System.currentTimeMillis();
PubSubManager creatorMgr = new PubSubManager(getConnection(0), getService());
LeafNode creatorNode = getPubnode(creatorMgr, nodeId, true, false);
BlockingQueue<ItemEventCoordinator<Item>> queue = new ArrayBlockingQueue<ItemEventCoordinator<Item>>(3);
// Setup event receiver
PubSubManager subMgr = new PubSubManager(getConnection(1), getService());
LeafNode subNode = (LeafNode)subMgr.getNode(nodeId);
ItemEventCoordinator<Item> sub1Handler = new ItemEventCoordinator<Item>(queue, "sub1");
subNode.addItemEventListener(sub1Handler);
Subscription sub1 = subNode.subscribe(getConnection(1).getUser());
// Send event
String itemId = String.valueOf(System.currentTimeMillis());
creatorNode.send(new Item(itemId));
ItemEventCoordinator<Item> coord = queue.poll(5, TimeUnit.SECONDS);
assertEquals(1, coord.events.getItems().size());
assertEquals(itemId, coord.events.getItems().iterator().next().getId());
}
@Override
public E take() throws InterruptedException {
int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
takeLock.lockInterruptibly();
try {
// Wait while queue is empty
for (;;) {
BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
if (q != null) {
// Got queue, so return if we can poll out an object
E e = q.poll();
if (e != null) {
return e;
}
}
notEmpty.await();
}
} finally {
takeLock.unlock();
}
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (threadName != null) {
LOG.error("Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
}
if (!executor.isShutdown()) {
BlockingQueue<Runnable> queue = executor.getQueue();
// 舍弃1/2队列元素,例如7个单位的元素,舍弃3个
int discardSize = queue.size() >> 1;
for (int i = 0; i < discardSize; i++) {
// 从头部移除并返问队列头部的元素
queue.poll();
}
// 添加元素,如果队列满,不阻塞,返回false
queue.offer(runnable);
}
}
@Test
public void testTextJsonImplicitListAsyncStream() throws Exception {
String address = "http://localhost:" + PORT + "/reactor/mono/textJsonImplicitListAsyncStream";
final BlockingQueue<HelloWorldBean> holder = new LinkedBlockingQueue<>();
ClientBuilder.newClient()
.register(new JacksonJsonProvider())
.register(new ReactorInvokerProvider())
.target(address)
.request(MediaType.APPLICATION_JSON)
.rx(ReactorInvoker.class)
.get(HelloWorldBean.class)
.doOnNext(holder::offer)
.subscribe();
HelloWorldBean bean = holder.poll(1L, TimeUnit.SECONDS);
assertNotNull(bean);
assertEquals("Hello", bean.getGreeting());
assertEquals("World", bean.getAudience());
}
/**
* Folds started/stopped events into a map of status changes
*/
private Map<TorrentId, StatusChange> foldStartStopEvents(BlockingQueue<Event> events) {
int k = events.size(); // decide on the number of events to process upfront
Map<TorrentId, StatusChange> statusChanges = new HashMap<>(k * 2);
Event event;
while (--k >= 0 && (event = events.poll()) != null) {
if (event instanceof TorrentStartedEvent) {
statusChanges.put(((TorrentStartedEvent) event).getTorrentId(), StatusChange.STARTED);
} else if (event instanceof TorrentStoppedEvent) {
statusChanges.put(((TorrentStoppedEvent) event).getTorrentId(), StatusChange.STOPPED);
} else {
LOGGER.warn("Unexpected event type: " + event.getClass().getName() + ". Skipping...");
}
}
return statusChanges;
}
@Override
public T get(String key, long timeOut, TimeUnit unit) {
BlockingQueue<T> q = getQueue(key);
T t = null;
try {
if (timeOut <= 0 || null == unit) {
t = q.take();
} else {
t = q.poll(timeOut, unit);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
map.remove(key);
}
return t;
}
@Test(expected = IllegalStateException.class)
public void testIfPublisherThrowsWhenMismatchAckforActiveFlowSeen() throws InterruptedException {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "Shard-001-1");
final int[] totalRecordsRetrieved = { 0 };
BlockingQueue<BatchUniqueIdentifier> ackQueue = new LinkedBlockingQueue<>();
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
// Enqueue the ack for bursty delivery
ackQueue.add(recordsRetrieved.batchUniqueIdentifier());
// Send stale event periodically
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
recordFlow));
BatchUniqueIdentifier batchUniqueIdentifierQueued;
int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected.
while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier()));
}
}
@Test
public void notifyUpdate_isNotActive() throws Exception {
// The signal that will kill the notifying thread.
final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
// The queue generated work is offered to.
final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
// The listener that will perform the QueryEventWorkGenerator work.
final CountDownLatch latch = new CountDownLatch(1);
latch.countDown();
final QueryEventWorkGenerator generator =
new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
// A thread that will attempt to notify the generator with an update query change.
final UUID queryId = UUID.randomUUID();
final StreamsQuery query = new StreamsQuery(queryId, "query", false, false);
final Thread notifyThread = new Thread(() -> {
final QueryChange change = QueryChange.update(queryId, false);
final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
generator.notify(entry, Optional.of(query));
});
// Start the thread.
notifyThread.start();
try {
// Show work was added to the queue and the notifying thread died.
final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
final QueryEvent expected = QueryEvent.stopped("rya", queryId);
assertEquals(expected, event);
} finally {
shutdownSignal.set(true);
notifyThread.join();
}
}
/**
* poll() provides no strict consistency: it is possible for poll to return
* null even though an element is in the queue.
*/
@Override
public E poll() {
int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
if (q == null) {
return null; // everything is empty
}
// Delegate to the sub-queue's poll, which could still return null
return q.poll();
}
@Override
public Message poll() {
// If its null, or we hit the end, reset it.
if (consumerIdIterator == null || !consumerIdIterator.hasNext()) {
consumerIdIterator = messageBuffer.keySet().iterator();
}
// Try every buffer until we hit the end.
Message returnMsg = null;
while (returnMsg == null && consumerIdIterator.hasNext()) {
// Advance iterator
final VirtualSpoutIdentifier nextConsumerId = consumerIdIterator.next();
// Find our buffer
final BlockingQueue<Message> queue = messageBuffer.get(nextConsumerId);
// We missed?
if (queue == null) {
logger.debug("Non-existent queue found, resetting iterator.");
consumerIdIterator = messageBuffer.keySet().iterator();
continue;
}
returnMsg = queue.poll();
}
return returnMsg;
}
/**
* poll() provides no strict consistency: it is possible for poll to return
* null even though an element is in the queue.
*/
@Override
public E poll() {
int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
if (q == null) {
return null; // everything is empty
}
// Delegate to the sub-queue's poll, which could still return null
return q.poll();
}
@Test
public void testIsEmpty() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
Assert.assertTrue(dbq.isEmpty());
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
Assert.assertFalse(dbq.isEmpty());
}
for(int i=0; i<cap; i++) {
Assert.assertFalse(dbq.isEmpty());
dbq.poll();
}
Assert.assertTrue(dbq.isEmpty());
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
Assert.assertFalse(dbq.isEmpty());
}
for(int i=0; i<cap; i++) {
Assert.assertFalse(dbq.isEmpty());
dbq.poll();
}
Assert.assertTrue(dbq.isEmpty());
}
public void testSendAndReceiveDelayed() throws Exception
{
// Setup event source
String nodeId = "TestNode" + System.currentTimeMillis();
PubSubManager creatorMgr = new PubSubManager(getConnection(0), getService());
LeafNode creatorNode = getPubnode(creatorMgr, nodeId, true, false);
// Send event
String itemId = String.valueOf("DelayId-" + System.currentTimeMillis());
String payloadString = "<book xmlns='pubsub:test:book'><author>Sir Arthur Conan Doyle</author></book>";
creatorNode.send(new PayloadItem<SimplePayload>(itemId, new SimplePayload("book", "pubsub:test:book", payloadString)));
Thread.sleep(1000);
BlockingQueue<ItemEventCoordinator<PayloadItem<SimplePayload>>> queue = new ArrayBlockingQueue<ItemEventCoordinator<PayloadItem<SimplePayload>>>(3);
// Setup event receiver
PubSubManager subMgr = new PubSubManager(getConnection(1), getService());
LeafNode subNode = (LeafNode)subMgr.getNode(nodeId);
ItemEventCoordinator<PayloadItem<SimplePayload>> sub1Handler = new ItemEventCoordinator<PayloadItem<SimplePayload>>(queue, "sub1");
subNode.addItemEventListener(sub1Handler);
Subscription sub1 = subNode.subscribe(getConnection(1).getUser());
ItemEventCoordinator<PayloadItem<SimplePayload>> coord = queue.poll(5, TimeUnit.SECONDS);
assertTrue(coord.events.isDelayed());
assertNotNull(coord.events.getPublishedDate());
}
@ExpectWarning("RV")
public static void main(String args[]) throws Exception {
String str = " ttesting ";
str.trim();
str.toLowerCase();
str.toUpperCase();
str.replace(" ", "");
str.replace(' ', '.');
str.substring(0, 10);
str.equals("testing");
Semaphore s = new Semaphore(17, true);
s.tryAcquire();
s.tryAcquire(12, TimeUnit.MILLISECONDS);
BlockingQueue<Object> q = new LinkedBlockingQueue<Object>();
q.offer(new Object());
q.offer(new Object(), 12, TimeUnit.MILLISECONDS);
q.poll(12, TimeUnit.MILLISECONDS);
q.poll();
Lock l = new ReentrantLock();
Condition c = l.newCondition();
l.lock();
try {
c.awaitNanos(12);
c.awaitUntil(new Date());
c.await(12, TimeUnit.NANOSECONDS);
} finally {
l.unlock();
}
q.poll();
}
public void testDeleteItemAndNotify() throws Exception
{
// Setup event source
String nodeId = "TestNode" + System.currentTimeMillis();
PubSubManager creatorMgr = new PubSubManager(getConnection(0), getService());
LeafNode creatorNode = getPubnode(creatorMgr, nodeId, true, false);
BlockingQueue<ItemDeleteCoordinator> queue = new ArrayBlockingQueue<ItemDeleteCoordinator>(3);
// Setup event receiver
PubSubManager subMgr = new PubSubManager(getConnection(1), getService());
LeafNode subNode = (LeafNode)subMgr.getNode(nodeId);
ItemDeleteCoordinator sub1Handler = new ItemDeleteCoordinator(queue, "sub1");
subNode.addItemDeleteListener(sub1Handler);
subNode.subscribe(getConnection(1).getUser());
// Send event
String itemId = String.valueOf(System.currentTimeMillis());
Collection<Item> items = new ArrayList<Item>(3);
String id1 = "First-" + itemId;
String id2 = "Second-" + itemId;
String id3 = "Third-" + itemId;
items.add(new Item(id1));
items.add(new Item(id2));
items.add(new Item(id3));
creatorNode.send(items);
creatorNode.deleteItem(id1);
ItemDeleteCoordinator coord = queue.poll(5, TimeUnit.SECONDS);
assertEquals(1, coord.event.getItemIds().size());
assertEquals(id1, coord.event.getItemIds().get(0));
creatorNode.deleteItem(Arrays.asList(id2, id3));
coord = queue.poll(5, TimeUnit.SECONDS);
assertEquals(2, coord.event.getItemIds().size());
assertTrue(coord.event.getItemIds().contains(id2));
assertTrue(coord.event.getItemIds().contains(id3));
}
@Test
public void testReconnectSyslogServer() throws Exception {
final BlockingQueue<SyslogServerEventIF> udpQueue = BlockedAllProtocolsSyslogServerEventHandler.getQueue("udp");
final BlockingQueue<SyslogServerEventIF> tcpQueue = BlockedAllProtocolsSyslogServerEventHandler.getQueue("tcp");
udpQueue.clear();
tcpQueue.clear();
// logging before syslog restart
makeLog();
SyslogServerEventIF udpSyslogEvent = udpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNotNull("No message was logged into the UDP syslog", udpSyslogEvent);
SyslogServerEventIF tcpSyslogEvent = tcpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNotNull("No message was logged into the TCP syslog", tcpSyslogEvent);
stopSyslogServers();
makeLog_syslogIsOffline();
udpSyslogEvent = udpQueue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNull("Message was logged into the UDP syslog even if syslog server should be stopped", udpSyslogEvent);
tcpSyslogEvent = tcpQueue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNull("Message was logged into the TCP syslog even if syslog server should be stopped", tcpSyslogEvent);
startSyslogServers(host);
// logging after first syslog restart
makeLog();
udpSyslogEvent = udpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNotNull("No message was logged into the UDP syslog after first syslog server restart", udpSyslogEvent);
tcpSyslogEvent = tcpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNotNull("No message was logged into the TCP syslog after first syslog server restart", tcpSyslogEvent);
stopSyslogServers();
makeLog_syslogIsOffline();
udpSyslogEvent = udpQueue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNull("Message was logged into the UDP syslog even if syslog server should be stopped", udpSyslogEvent);
tcpSyslogEvent = tcpQueue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNull("Message was logged into the TCP syslog even if syslog server should be stopped", tcpSyslogEvent);
startSyslogServers(host);
// logging after second syslog restart
makeLog();
udpSyslogEvent = udpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNotNull("No message was logged into the UDP syslog after second syslog server restart", udpSyslogEvent);
tcpSyslogEvent = tcpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
Assert.assertNotNull("No message was logged into the TCP syslog after second syslog server restart", tcpSyslogEvent);
}
/**
* {@inheritDoc}
*/
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
long stopTime = clock.currentTimeMillis() + timeout;
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
metrics.incPoll();
for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
List<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size());
if (queue.size() > 0) {
queue.drainTo(outgoingList);
} else if (timeout != 0) {
IncomingMessageEnvelope envelope = null;
// How long we can legally block (if timeout > 0)
long timeRemaining = stopTime - clock.currentTimeMillis();
if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
// Block until we get at least one message, or until we catch up to
// the head of the stream.
while (envelope == null && !isAtHead(systemStreamPartition)) {
// Check for consumerFailure and throw exception
if (this.failureCause != null) {
String message = String.format("%s: Consumer has stopped.", this);
throw new SamzaException(message, this.failureCause);
}
metrics.incBlockingPoll(systemStreamPartition);
envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
}
} else if (timeout > 0 && timeRemaining > 0) {
// Block until we get at least one message.
metrics.incBlockingTimeoutPoll(systemStreamPartition);
envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
}
// If we got a message, add it.
if (envelope != null) {
outgoingList.add(envelope);
// Drain any remaining messages without blocking.
queue.drainTo(outgoingList);
}
}
if (outgoingList.size() > 0) {
messagesToReturn.put(systemStreamPartition, outgoingList);
subtractSizeOnQDrain(systemStreamPartition, outgoingList);
}
}
return messagesToReturn;
}
/**
* Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)},
* but with a different behavior in case it is interrupted while waiting. In that case, the
* operation will continue as usual, and in the end the thread's interruption status will be set
* (no {@code InterruptedException} is thrown).
*
* @param q the blocking queue to be drained
* @param buffer where to add the transferred elements
* @param numElements the number of elements to be waited for
* @param timeout how long to wait before giving up, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
* @return the number of elements transferred
*/
@Beta
@CanIgnoreReturnValue
public static <E> int drainUninterruptibly(
BlockingQueue<E> q,
Collection<? super E> buffer,
int numElements,
long timeout,
TimeUnit unit) {
Preconditions.checkNotNull(buffer);
long deadline = System.nanoTime() + unit.toNanos(timeout);
int added = 0;
boolean interrupted = false;
try {
while (added < numElements) {
// we could rely solely on #poll, but #drainTo might be more efficient when there are
// multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
added += q.drainTo(buffer, numElements - added);
if (added < numElements) { // not enough elements immediately available; will have to poll
E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
while (true) {
try {
e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
break;
} catch (InterruptedException ex) {
interrupted = true; // note interruption and retry
}
}
if (e == null) {
break; // we already waited enough, and there are no more elements in sight
}
buffer.add(e);
added++;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
return added;
}
/**
* This test verifies that if one slot future fails, the deployment will be aborted.
*/
@Test
public void testOneSlotFailureAbortsDeploy() throws Exception {
// [pipelined]
// we construct a simple graph (source) ----------------> (target)
final int parallelism = 6;
final JobVertex sourceVertex = new JobVertex("source");
sourceVertex.setParallelism(parallelism);
sourceVertex.setInvokableClass(NoOpInvokable.class);
final JobVertex targetVertex = new JobVertex("target");
targetVertex.setParallelism(parallelism);
targetVertex.setInvokableClass(NoOpInvokable.class);
targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
jobGraph.setScheduleMode(ScheduleMode.EAGER);
jobGraph.setAllowQueuedScheduling(true);
//
// Create the slots, futures, and the slot provider
final InteractionsCountingTaskManagerGateway taskManager = createTaskManager();
final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
final TestingSlotOwner slotOwner = new TestingSlotOwner();
slotOwner.setReturnAllocatedSlotConsumer(
(LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId()));
final LogicalSlot[] sourceSlots = new LogicalSlot[parallelism];
final LogicalSlot[] targetSlots = new LogicalSlot[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
sourceSlots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId());
targetSlots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId());
sourceFutures[i] = new CompletableFuture<>();
targetFutures[i] = new CompletableFuture<>();
}
ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
slotProvider.addSlots(targetVertex.getID(), targetFutures);
final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
//
// we complete some of the futures
for (int i = 0; i < parallelism; i += 2) {
sourceFutures[i].complete(sourceSlots[i]);
targetFutures[i].complete(targetSlots[i]);
}
// kick off the scheduling
eg.scheduleForExecution();
// fail one slot
sourceFutures[1].completeExceptionally(new TestRuntimeException());
// wait until the job failed as a whole
eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
// wait until all slots are back
for (int i = 0; i < parallelism; i++) {
returnedSlots.poll(2000L, TimeUnit.MILLISECONDS);
}
// no deployment calls must have happened
assertThat(taskManager.getSubmitTaskCount(), is(0));
// all completed futures must have been returns
for (int i = 0; i < parallelism; i += 2) {
assertFalse(sourceSlots[i].isAlive());
assertFalse(targetSlots[i].isAlive());
}
}
public void sendImage(MinicapJettyServer server) {
Integer port = server.getPort();
BlockingQueue<byte[]> imgdataQueue = portQueueMapping.get(port);
Thread sendImgThread =
new Thread() {
@Override
public void run() {
byte[] buffer = {};
while (!isInterrupted()) {
try {
byte[] candidate = {};
if (imgdataQueue != null) {
byte[] currentImg =
imgdataQueue.poll(IMG_POLL_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (currentImg == null) {
candidate = buffer.clone();
} else {
candidate = currentImg;
buffer = candidate.clone();
}
} else {
Thread.sleep(WAIT_FOR_IMG_QUEUE.toMillis());
continue;
}
// not ready
if (port == null) {
return;
}
// Send the new img to all open WebSocket sessions
ConcurrentSet<Session> sessions = portSessionMapping.get(port);
if (sessions == null) {
continue;
}
for (Session session : sessions) {
if (!session.isOpen()) {
portSessionMapping.get(port).remove(session);
} else {
session.getRemote().sendBytes(ByteBuffer.wrap(candidate));
}
}
} catch (Throwable e) {
// Let the owning Thread know it's been interrupted, so it can clean up
interrupt();
logger.info("No data from minicap.");
}
}
logger.info(String.format("Thread id(%s) killed.", this.getId()));
}
};
sendImgThread.start();
portSendImgThreadMapping.put(port, sendImgThread);
}