java.util.concurrent.BlockingQueue#poll ( )源码实例Demo

下面列出了java.util.concurrent.BlockingQueue#poll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Smack   文件: TestEvents.java
public void testSendAndReceiveNoPayload() throws Exception
{
	// Setup event source
	String nodeId = "TestNode" + System.currentTimeMillis();
	PubSubManager creatorMgr = new PubSubManager(getConnection(0), getService());
	LeafNode creatorNode = getPubnode(creatorMgr, nodeId, true, false);

	BlockingQueue<ItemEventCoordinator<Item>> queue = new ArrayBlockingQueue<ItemEventCoordinator<Item>>(3);

	// Setup event receiver
	PubSubManager subMgr = new PubSubManager(getConnection(1), getService());
	LeafNode subNode = (LeafNode)subMgr.getNode(nodeId);

	ItemEventCoordinator<Item> sub1Handler = new ItemEventCoordinator<Item>(queue, "sub1");
	subNode.addItemEventListener(sub1Handler);
	Subscription sub1 = subNode.subscribe(getConnection(1).getUser());

       // Send event
       String itemId = String.valueOf(System.currentTimeMillis());
       creatorNode.send(new Item(itemId));

	ItemEventCoordinator<Item> coord = queue.poll(5, TimeUnit.SECONDS);
   	assertEquals(1, coord.events.getItems().size());
   	assertEquals(itemId, coord.events.getItems().iterator().next().getId());
}
 
源代码2 项目: hadoop   文件: FairCallQueue.java
@Override
public E take() throws InterruptedException {
  int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();

  takeLock.lockInterruptibly();
  try {
    // Wait while queue is empty
    for (;;) {
      BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
      if (q != null) {
        // Got queue, so return if we can poll out an object
        E e = q.poll();
        if (e != null) {
          return e;
        }
      }

      notEmpty.await();
    }
  } finally {
    takeLock.unlock();
  }
}
 
源代码3 项目: BootNettyRpc   文件: DiscardedPolicyWithReport.java
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
    if (threadName != null) {
        LOG.error("Thread pool [{}] is exhausted, executor={}", threadName, executor.toString());
    }

    if (!executor.isShutdown()) {
        BlockingQueue<Runnable> queue = executor.getQueue();
        // 舍弃1/2队列元素,例如7个单位的元素,舍弃3个
        int discardSize = queue.size() >> 1;
        for (int i = 0; i < discardSize; i++) {
            // 从头部移除并返问队列头部的元素
            queue.poll();
        }

        // 添加元素,如果队列满,不阻塞,返回false
        queue.offer(runnable);
    }
}
 
源代码4 项目: cxf   文件: MonoReactorTest.java
@Test
public void testTextJsonImplicitListAsyncStream() throws Exception {
    String address = "http://localhost:" + PORT + "/reactor/mono/textJsonImplicitListAsyncStream";
    final BlockingQueue<HelloWorldBean> holder = new LinkedBlockingQueue<>();
    ClientBuilder.newClient()
            .register(new JacksonJsonProvider())
            .register(new ReactorInvokerProvider())
            .target(address)
            .request(MediaType.APPLICATION_JSON)
            .rx(ReactorInvoker.class)
            .get(HelloWorldBean.class)
            .doOnNext(holder::offer)
            .subscribe();
    HelloWorldBean bean = holder.poll(1L, TimeUnit.SECONDS);
    assertNotNull(bean);
    assertEquals("Hello", bean.getGreeting());
    assertEquals("World", bean.getAudience());
}
 
源代码5 项目: bt   文件: LocalServiceDiscoveryService.java
/**
 * Folds started/stopped events into a map of status changes
 */
private Map<TorrentId, StatusChange> foldStartStopEvents(BlockingQueue<Event> events) {
    int k = events.size(); // decide on the number of events to process upfront

    Map<TorrentId, StatusChange> statusChanges = new HashMap<>(k * 2);
    Event event;
    while (--k >= 0 && (event = events.poll()) != null) {
        if (event instanceof TorrentStartedEvent) {
            statusChanges.put(((TorrentStartedEvent) event).getTorrentId(), StatusChange.STARTED);
        } else if (event instanceof TorrentStoppedEvent) {
            statusChanges.put(((TorrentStoppedEvent) event).getTorrentId(), StatusChange.STOPPED);
        } else {
            LOGGER.warn("Unexpected event type: " + event.getClass().getName() + ". Skipping...");
        }
    }
    return statusChanges;
}
 
源代码6 项目: oim-fx   文件: QueueBlockingHandler.java
@Override
public T get(String key, long timeOut, TimeUnit unit) {
	BlockingQueue<T> q = getQueue(key);
	T t = null;
	try {
		if (timeOut <= 0 || null == unit) {
			t = q.take();
		} else {
			t = q.poll(timeOut, unit);
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		map.remove(key);
	}
	return t;
}
 
@Test(expected = IllegalStateException.class)
public void testIfPublisherThrowsWhenMismatchAckforActiveFlowSeen() throws InterruptedException {
    FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
    FanOutRecordsPublisher.RecordFlow recordFlow =
            new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "Shard-001-1");
    final int[] totalRecordsRetrieved = { 0 };
    BlockingQueue<BatchUniqueIdentifier> ackQueue = new LinkedBlockingQueue<>();
    fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
        @Override public void onSubscribe(Subscription subscription) {}
        @Override public void onNext(RecordsRetrieved recordsRetrieved) {
            totalRecordsRetrieved[0]++;
            // Enqueue the ack for bursty delivery
            ackQueue.add(recordsRetrieved.batchUniqueIdentifier());
            // Send stale event periodically
        }
        @Override public void onError(Throwable throwable) {}
        @Override public void onComplete() {}
    });
    IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
            new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
            recordFlow));
    BatchUniqueIdentifier batchUniqueIdentifierQueued;
    int count = 0;
    // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
    // delivered as expected.
    while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
        final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
        fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
                () -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier()));
    }
}
 
源代码8 项目: rya   文件: QueryEventWorkGeneratorTest.java
@Test
public void notifyUpdate_isNotActive() throws Exception {
    // The signal that will kill the notifying thread.
    final AtomicBoolean shutdownSignal = new AtomicBoolean(false);

    // The queue generated work is offered to.
    final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);

    // The listener that will perform the QueryEventWorkGenerator work.
    final CountDownLatch latch = new CountDownLatch(1);
    latch.countDown();
    final QueryEventWorkGenerator generator =
            new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);

    // A thread that will attempt to notify the generator with an update query change.
    final UUID queryId = UUID.randomUUID();
    final StreamsQuery query = new StreamsQuery(queryId, "query", false, false);
    final Thread notifyThread = new Thread(() -> {
        final QueryChange change = QueryChange.update(queryId, false);
        final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
        generator.notify(entry, Optional.of(query));
    });

    // Start the thread.
    notifyThread.start();

    try {
        // Show work was added to the queue and the notifying thread died.
        final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
        final QueryEvent expected = QueryEvent.stopped("rya", queryId);
        assertEquals(expected, event);
    } finally {
        shutdownSignal.set(true);
        notifyThread.join();
    }
}
 
源代码9 项目: hadoop   文件: FairCallQueue.java
/**
 * poll() provides no strict consistency: it is possible for poll to return
 * null even though an element is in the queue.
 */
@Override
public E poll() {
  int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();

  BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
  if (q == null) {
    return null; // everything is empty
  }

  // Delegate to the sub-queue's poll, which could still return null
  return q.poll();
}
 
@Override
public Message poll() {
    // If its null, or we hit the end, reset it.
    if (consumerIdIterator == null || !consumerIdIterator.hasNext()) {
        consumerIdIterator = messageBuffer.keySet().iterator();
    }

    // Try every buffer until we hit the end.
    Message returnMsg = null;
    while (returnMsg == null && consumerIdIterator.hasNext()) {

        // Advance iterator
        final VirtualSpoutIdentifier nextConsumerId = consumerIdIterator.next();

        // Find our buffer
        final BlockingQueue<Message> queue = messageBuffer.get(nextConsumerId);

        // We missed?
        if (queue == null) {
            logger.debug("Non-existent queue found, resetting iterator.");
            consumerIdIterator = messageBuffer.keySet().iterator();
            continue;
        }
        returnMsg = queue.poll();
    }
    return returnMsg;
}
 
源代码11 项目: big-c   文件: FairCallQueue.java
/**
 * poll() provides no strict consistency: it is possible for poll to return
 * null even though an element is in the queue.
 */
@Override
public E poll() {
  int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();

  BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
  if (q == null) {
    return null; // everything is empty
  }

  // Delegate to the sub-queue's poll, which could still return null
  return q.poll();
}
 
源代码12 项目: disruptor   文件: PushPullBlockingQueueTest.java
@Test
public void testIsEmpty() {
    final int cap = 100;
    final BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);

    Assert.assertTrue(dbq.isEmpty());

    for(int i=0; i<cap; i++) {
        dbq.offer(Integer.valueOf(i));
        Assert.assertFalse(dbq.isEmpty());
    }

    for(int i=0; i<cap; i++) {
        Assert.assertFalse(dbq.isEmpty());
        dbq.poll();
    }

    Assert.assertTrue(dbq.isEmpty());

    for(int i=0; i<cap; i++) {
        dbq.offer(Integer.valueOf(i));
        Assert.assertFalse(dbq.isEmpty());
    }

    for(int i=0; i<cap; i++) {
        Assert.assertFalse(dbq.isEmpty());
        dbq.poll();
    }

    Assert.assertTrue(dbq.isEmpty());
}
 
源代码13 项目: Smack   文件: TestEvents.java
public void testSendAndReceiveDelayed() throws Exception
{
	// Setup event source
	String nodeId = "TestNode" + System.currentTimeMillis();
	PubSubManager creatorMgr = new PubSubManager(getConnection(0), getService());

	LeafNode creatorNode = getPubnode(creatorMgr, nodeId, true, false);

	// Send event
       String itemId = String.valueOf("DelayId-" + System.currentTimeMillis());
       String payloadString = "<book xmlns='pubsub:test:book'><author>Sir Arthur Conan Doyle</author></book>";
       creatorNode.send(new PayloadItem<SimplePayload>(itemId, new SimplePayload("book", "pubsub:test:book", payloadString)));

       Thread.sleep(1000);

       BlockingQueue<ItemEventCoordinator<PayloadItem<SimplePayload>>> queue = new ArrayBlockingQueue<ItemEventCoordinator<PayloadItem<SimplePayload>>>(3);

	// Setup event receiver
	PubSubManager subMgr = new PubSubManager(getConnection(1), getService());
	LeafNode subNode = (LeafNode)subMgr.getNode(nodeId);

	ItemEventCoordinator<PayloadItem<SimplePayload>> sub1Handler = new ItemEventCoordinator<PayloadItem<SimplePayload>>(queue, "sub1");
	subNode.addItemEventListener(sub1Handler);
	Subscription sub1 = subNode.subscribe(getConnection(1).getUser());

	ItemEventCoordinator<PayloadItem<SimplePayload>> coord = queue.poll(5, TimeUnit.SECONDS);
  		assertTrue(coord.events.isDelayed());
  		assertNotNull(coord.events.getPublishedDate());
}
 
源代码14 项目: spotbugs   文件: IgnoredReturnValue.java
@ExpectWarning("RV")
public static void main(String args[]) throws Exception {
    String str = " ttesting ";
    str.trim();
    str.toLowerCase();
    str.toUpperCase();
    str.replace(" ", "");
    str.replace(' ', '.');
    str.substring(0, 10);
    str.equals("testing");
    Semaphore s = new Semaphore(17, true);
    s.tryAcquire();
    s.tryAcquire(12, TimeUnit.MILLISECONDS);
    BlockingQueue<Object> q = new LinkedBlockingQueue<Object>();
    q.offer(new Object());
    q.offer(new Object(), 12, TimeUnit.MILLISECONDS);
    q.poll(12, TimeUnit.MILLISECONDS);
    q.poll();
    Lock l = new ReentrantLock();
    Condition c = l.newCondition();
    l.lock();
    try {
        c.awaitNanos(12);
        c.awaitUntil(new Date());
        c.await(12, TimeUnit.NANOSECONDS);
    } finally {
        l.unlock();
    }

    q.poll();
}
 
源代码15 项目: Smack   文件: TestEvents.java
public void testDeleteItemAndNotify() throws Exception
{
	// Setup event source
	String nodeId = "TestNode" + System.currentTimeMillis();
	PubSubManager creatorMgr = new PubSubManager(getConnection(0), getService());

	LeafNode creatorNode = getPubnode(creatorMgr, nodeId, true, false);

	BlockingQueue<ItemDeleteCoordinator> queue = new ArrayBlockingQueue<ItemDeleteCoordinator>(3);

	// Setup event receiver
	PubSubManager subMgr = new PubSubManager(getConnection(1), getService());
	LeafNode subNode = (LeafNode)subMgr.getNode(nodeId);

	ItemDeleteCoordinator sub1Handler = new ItemDeleteCoordinator(queue, "sub1");
	subNode.addItemDeleteListener(sub1Handler);
	subNode.subscribe(getConnection(1).getUser());

	// Send event
       String itemId = String.valueOf(System.currentTimeMillis());

       Collection<Item> items = new ArrayList<Item>(3);
       String id1 = "First-" + itemId;
       String id2 = "Second-" + itemId;
       String id3 = "Third-" + itemId;
       items.add(new Item(id1));
       items.add(new Item(id2));
       items.add(new Item(id3));
       creatorNode.send(items);

       creatorNode.deleteItem(id1);

  		ItemDeleteCoordinator coord = queue.poll(5, TimeUnit.SECONDS);
  		assertEquals(1, coord.event.getItemIds().size());
  		assertEquals(id1, coord.event.getItemIds().get(0));

  		creatorNode.deleteItem(Arrays.asList(id2, id3));

  		coord = queue.poll(5, TimeUnit.SECONDS);
  		assertEquals(2, coord.event.getItemIds().size());
  		assertTrue(coord.event.getItemIds().contains(id2));
  		assertTrue(coord.event.getItemIds().contains(id3));
}
 
@Test
public void testReconnectSyslogServer() throws Exception {
    final BlockingQueue<SyslogServerEventIF> udpQueue = BlockedAllProtocolsSyslogServerEventHandler.getQueue("udp");
    final BlockingQueue<SyslogServerEventIF> tcpQueue = BlockedAllProtocolsSyslogServerEventHandler.getQueue("tcp");
    udpQueue.clear();
    tcpQueue.clear();

    // logging before syslog restart
    makeLog();
    SyslogServerEventIF udpSyslogEvent = udpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNotNull("No message was logged into the UDP syslog", udpSyslogEvent);
    SyslogServerEventIF tcpSyslogEvent = tcpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNotNull("No message was logged into the TCP syslog", tcpSyslogEvent);

    stopSyslogServers();

    makeLog_syslogIsOffline();
    udpSyslogEvent = udpQueue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNull("Message was logged into the UDP syslog even if syslog server should be stopped", udpSyslogEvent);
    tcpSyslogEvent = tcpQueue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNull("Message was logged into the TCP syslog even if syslog server should be stopped", tcpSyslogEvent);

    startSyslogServers(host);

    // logging after first syslog restart
    makeLog();
    udpSyslogEvent = udpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNotNull("No message was logged into the UDP syslog after first syslog server restart", udpSyslogEvent);
    tcpSyslogEvent = tcpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNotNull("No message was logged into the TCP syslog after first syslog server restart", tcpSyslogEvent);

    stopSyslogServers();

    makeLog_syslogIsOffline();
    udpSyslogEvent = udpQueue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNull("Message was logged into the UDP syslog even if syslog server should be stopped", udpSyslogEvent);
    tcpSyslogEvent = tcpQueue.poll(1 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNull("Message was logged into the TCP syslog even if syslog server should be stopped", tcpSyslogEvent);

    startSyslogServers(host);

    // logging after second syslog restart
    makeLog();
    udpSyslogEvent = udpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNotNull("No message was logged into the UDP syslog after second syslog server restart", udpSyslogEvent);
    tcpSyslogEvent = tcpQueue.poll(5 * ADJUSTED_SECOND, TimeUnit.MILLISECONDS);
    Assert.assertNotNull("No message was logged into the TCP syslog after second syslog server restart", tcpSyslogEvent);

}
 
源代码17 项目: samza   文件: BlockingEnvelopeMap.java
/**
 * {@inheritDoc}
 */
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
  long stopTime = clock.currentTimeMillis() + timeout;
  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();

  metrics.incPoll();

  for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
    BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
    List<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size());

    if (queue.size() > 0) {
      queue.drainTo(outgoingList);
    } else if (timeout != 0) {
      IncomingMessageEnvelope envelope = null;

      // How long we can legally block (if timeout > 0)
      long timeRemaining = stopTime - clock.currentTimeMillis();

      if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
        // Block until we get at least one message, or until we catch up to
        // the head of the stream.
        while (envelope == null && !isAtHead(systemStreamPartition)) {

          // Check for consumerFailure and throw exception
          if (this.failureCause != null) {
            String message = String.format("%s: Consumer has stopped.", this);
            throw new SamzaException(message, this.failureCause);
          }

          metrics.incBlockingPoll(systemStreamPartition);
          envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
        }
      } else if (timeout > 0 && timeRemaining > 0) {
        // Block until we get at least one message.
        metrics.incBlockingTimeoutPoll(systemStreamPartition);
        envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
      }

      // If we got a message, add it.
      if (envelope != null) {
        outgoingList.add(envelope);
        // Drain any remaining messages without blocking.
        queue.drainTo(outgoingList);
      }
    }

    if (outgoingList.size() > 0) {
      messagesToReturn.put(systemStreamPartition, outgoingList);
      subtractSizeOnQDrain(systemStreamPartition, outgoingList);
    }
  }

  return messagesToReturn;
}
 
源代码18 项目: codebuff   文件: Queues.java
/**
 * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)},
 * but with a different behavior in case it is interrupted while waiting. In that case, the
 * operation will continue as usual, and in the end the thread's interruption status will be set
 * (no {@code InterruptedException} is thrown).
 *
 * @param q the blocking queue to be drained
 * @param buffer where to add the transferred elements
 * @param numElements the number of elements to be waited for
 * @param timeout how long to wait before giving up, in units of {@code unit}
 * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
 * @return the number of elements transferred
 */

@Beta
@CanIgnoreReturnValue
public static <E> int drainUninterruptibly(
  BlockingQueue<E> q,
  Collection<? super E> buffer,
  int numElements,
  long timeout,
  TimeUnit unit) {
  Preconditions.checkNotNull(buffer);
  long deadline = System.nanoTime() + unit.toNanos(timeout);
  int added = 0;
  boolean interrupted = false;
  try {
    while (added < numElements) {
      // we could rely solely on #poll, but #drainTo might be more efficient when there are
      // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
      added += q.drainTo(buffer, numElements - added);
      if (added < numElements) { // not enough elements immediately available; will have to poll
        E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
        while (true) {
          try {
            e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
            break;
          } catch (InterruptedException ex) {
            interrupted = true; // note interruption and retry
          }
        }
        if (e == null) {
          break; // we already waited enough, and there are no more elements in sight
        }
        buffer.add(e);
        added++;
      }
    }
  } finally {
    if (interrupted) {
      Thread.currentThread().interrupt();
    }
  }
  return added;
}
 
源代码19 项目: flink   文件: ExecutionGraphSchedulingTest.java
/**
 * This test verifies that if one slot future fails, the deployment will be aborted.
 */
@Test
public void testOneSlotFailureAbortsDeploy() throws Exception {

	//                                            [pipelined]
	//  we construct a simple graph    (source) ----------------> (target)

	final int parallelism = 6;

	final JobVertex sourceVertex = new JobVertex("source");
	sourceVertex.setParallelism(parallelism);
	sourceVertex.setInvokableClass(NoOpInvokable.class);

	final JobVertex targetVertex = new JobVertex("target");
	targetVertex.setParallelism(parallelism);
	targetVertex.setInvokableClass(NoOpInvokable.class);

	targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

	final JobID jobId = new JobID();
	final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
	jobGraph.setScheduleMode(ScheduleMode.EAGER);
	jobGraph.setAllowQueuedScheduling(true);

	//
	//  Create the slots, futures, and the slot provider

	final InteractionsCountingTaskManagerGateway taskManager = createTaskManager();
	final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
	final TestingSlotOwner slotOwner = new TestingSlotOwner();
	slotOwner.setReturnAllocatedSlotConsumer(
		(LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId()));

	final LogicalSlot[] sourceSlots = new LogicalSlot[parallelism];
	final LogicalSlot[] targetSlots = new LogicalSlot[parallelism];

	@SuppressWarnings({"unchecked", "rawtypes"})
	final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
	@SuppressWarnings({"unchecked", "rawtypes"})
	final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];

	for (int i = 0; i < parallelism; i++) {
		sourceSlots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId());
		targetSlots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId());

		sourceFutures[i] = new CompletableFuture<>();
		targetFutures[i] = new CompletableFuture<>();
	}

	ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
	slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
	slotProvider.addSlots(targetVertex.getID(), targetFutures);

	final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
	eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());

	//
	//  we complete some of the futures

	for (int i = 0; i < parallelism; i += 2) {
		sourceFutures[i].complete(sourceSlots[i]);
		targetFutures[i].complete(targetSlots[i]);
	}

	//  kick off the scheduling
	eg.scheduleForExecution();

	// fail one slot
	sourceFutures[1].completeExceptionally(new TestRuntimeException());

	// wait until the job failed as a whole
	eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);

	// wait until all slots are back
	for (int i = 0; i < parallelism; i++) {
		returnedSlots.poll(2000L, TimeUnit.MILLISECONDS);
	}

	// no deployment calls must have happened
	assertThat(taskManager.getSubmitTaskCount(), is(0));

	// all completed futures must have been returns
	for (int i = 0; i < parallelism; i += 2) {
		assertFalse(sourceSlots[i].isAlive());
		assertFalse(targetSlots[i].isAlive());
	}
}
 
源代码20 项目: android-uiconductor   文件: MinicapServerManager.java
public void sendImage(MinicapJettyServer server) {
  Integer port = server.getPort();
  BlockingQueue<byte[]> imgdataQueue = portQueueMapping.get(port);
  Thread sendImgThread =
      new Thread() {
        @Override
        public void run() {
          byte[] buffer = {};
          while (!isInterrupted()) {
            try {
              byte[] candidate = {};

              if (imgdataQueue != null) {
                byte[] currentImg =
                    imgdataQueue.poll(IMG_POLL_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                if (currentImg == null) {
                  candidate = buffer.clone();
                } else {
                  candidate = currentImg;
                  buffer = candidate.clone();
                }
              } else {
                Thread.sleep(WAIT_FOR_IMG_QUEUE.toMillis());
                continue;
              }

              // not ready
              if (port == null) {
                return;
              }

              // Send the new img to all open WebSocket sessions
              ConcurrentSet<Session> sessions = portSessionMapping.get(port);

              if (sessions == null) {
                continue;
              }

              for (Session session : sessions) {
                if (!session.isOpen()) {
                  portSessionMapping.get(port).remove(session);
                } else {
                  session.getRemote().sendBytes(ByteBuffer.wrap(candidate));
                }
              }
            } catch (Throwable e) {
              // Let the owning Thread know it's been interrupted, so it can clean up
              interrupt();
              logger.info("No data from minicap.");
            }
          }

          logger.info(String.format("Thread id(%s) killed.", this.getId()));
        }
      };
  sendImgThread.start();
  portSendImgThreadMapping.put(port, sendImgThread);
}