类java.util.concurrent.BlockingQueue源码实例Demo

下面列出了怎么用java.util.concurrent.BlockingQueue的API类实例代码及写法,或者点击链接到github查看源代码。

private void cleanupExpiredSockets() {
    for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
        final List<EndpointConnection> connections = new ArrayList<>();

        EndpointConnection connection;
        while ((connection = connectionQueue.poll()) != null) {
            // If the socket has not been used in 10 seconds, shut it down.
            final long lastUsed = connection.getLastTimeUsed();
            if (lastUsed < System.currentTimeMillis() - idleExpirationMillis) {
                try {
                    connection.getSocketClientProtocol().shutdown(connection.getPeer());
                } catch (final Exception e) {
                    logger.debug("Failed to shut down {} using {} due to {}",
                            connection.getSocketClientProtocol(), connection.getPeer(), e);
                }

                terminate(connection);
            } else {
                connections.add(connection);
            }
        }

        connectionQueue.addAll(connections);
    }
}
 
public void shutdown() {
    shutdown = true;
    taskExecutor.shutdown();
    peerSelector.clear();

    for (final EndpointConnection conn : activeConnections) {
        conn.getPeer().getCommunicationsSession().interrupt();
    }

    for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
        EndpointConnection state;
        while ((state = connectionQueue.poll()) != null) {
            terminate(state);
        }
    }
}
 
源代码3 项目: twister2   文件: StreamingAllSharingExecutor2.java
private CommunicationWorker[] scheduleWaitFor(Map<Integer, INodeInstance> nodes) {
  BlockingQueue<INodeInstance> tasks;

  tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
  tasks.addAll(nodes.values());

  CommunicationWorker[] workers = new CommunicationWorker[numThreads];
  workers[0] = new CommunicationWorker(tasks);

  doneSignal = new CountDownLatch(numThreads - 1);
  for (int i = 1; i < numThreads; i++) {
    workers[i] = new CommunicationWorker(tasks);
    threads.submit(workers[i]);
  }
  return workers;
}
 
源代码4 项目: disruptor   文件: PushPullBlockingQueueTest.java
@Test
public void testPeek() {
    final int cap = 10;
    BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);

    try {

        Assert.assertNull(dbq.peek());

    } catch(NoSuchElementException nsex) {
        Assert.fail();
    }

    for(int i=0; i<cap; i++) {
        dbq.offer(Integer.valueOf(i));
        Assert.assertEquals(Integer.valueOf(0), dbq.peek());
    }

    for(int i=0; i<cap; i++) {
        Assert.assertEquals(Integer.valueOf(i), dbq.peek());
        dbq.poll(); // count up values checking peeks
    }
}
 
源代码5 项目: nifi   文件: SocketChannelDispatcher.java
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                               final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
                               final BlockingQueue<ByteBuffer> bufferPool,
                               final BlockingQueue<E> events,
                               final ComponentLog logger,
                               final int maxConnections,
                               final SSLContext sslContext,
                               final SslContextFactory.ClientAuth clientAuth,
                               final Charset charset) {
    this.eventFactory = eventFactory;
    this.handlerFactory = handlerFactory;
    this.bufferPool = bufferPool;
    this.events = events;
    this.logger = logger;
    this.maxConnections = maxConnections;
    this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
    this.sslContext = sslContext;
    this.clientAuth = clientAuth;
    this.charset = charset;

    if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) {
        throw new IllegalArgumentException(
                "A pool of available ByteBuffers equal to the maximum number of connections is required");
    }
}
 
源代码6 项目: disruptor   文件: MPMCBlockingQueueTest.java
@Test
public void testPeek() {
    final int cap = 10;
    BlockingQueue<Integer> dbq = new MPMCBlockingQueue<>(cap);

    try {

        Assert.assertNull(dbq.peek());

    } catch(NoSuchElementException nsex) {
        Assert.fail();
    }

    for(int i=0; i<cap; i++) {
        dbq.offer(Integer.valueOf(i));
        Assert.assertEquals(Integer.valueOf(0), dbq.peek());
    }

    for(int i=0; i<cap; i++) {
        Assert.assertEquals(Integer.valueOf(i), dbq.peek());
        dbq.poll(); // count up values checking peeks
    }
}
 
源代码7 项目: lutece-core   文件: DaemonSchedulerTest.java
private void testScheduleDelay( boolean shouldThrow ) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException,
        BrokenBarrierException, TimeoutException
{
    BlockingQueue<DaemonEntry> queue = new LinkedBlockingQueue<>( );
    ExecutorService executor = Executors.newSingleThreadExecutor( );
    DaemonScheduler scheduler = new DaemonScheduler( queue, executor );
    try
    {
        DaemonEntry entry = getDaemonEntry( "JUNIT" );
        TestDaemon testDaemon = (TestDaemon) entry.getDaemon( );
        testDaemon.setRunThrows( shouldThrow );
        Instant start = Instant.now( );
        scheduler.schedule( entry, 500L, TimeUnit.MILLISECONDS );
        assertFalse( testDaemon.hasRun( ) );
        testDaemon.go( );
        assertTrue( 500L <= Duration.between( start, Instant.now( ) ).toMillis( ) );
        testDaemon.waitForCompletion( );
        assertTrue( testDaemon.hasRun( ) );
    }
    finally
    {
        scheduler.shutdown( );
    }
}
 
源代码8 项目: binlake   文件: BinlogWorker.java
/**
 * 打印binlog 位置日志
 *
 * @param logPositions
 */
private void debugLogPosition(ConcurrentLinkedQueue<LogPosition> logPositions) {
    if (LogUtils.debug.isDebugEnabled()) {
        Iterator<LogPosition> liter = logPositions.iterator();
        boolean isHead = true;
        int count = 0;
        while (liter.hasNext()) {
            LogPosition lp = liter.next();
            if (isHead) {
                LogUtils.debug.debug(host + " truncLogPosQueue logPositions head is " + lp);
                isHead = false;
            }
            count++;
        }
        LogUtils.debug.debug(host + " truncLogPosQueue logPositions queue size " + count);

        BlockingQueue<Object> queue = this.throttler;
        if (queue != null) {
            LogUtils.debug.debug(host + " throttler queue size " + queue.size());
        }
    }
}
 
源代码9 项目: incubator-gobblin   文件: CouchbaseWriterTest.java
private void drainQueue(BlockingQueue<Pair<AbstractDocument, Future>> queue, int threshold, long sleepTime,
    TimeUnit sleepUnit, List<Pair<AbstractDocument, Future>> failedFutures) {
  while (queue.remainingCapacity() < threshold) {
    if (sleepTime > 0) {
      Pair<AbstractDocument, Future> topElement = queue.peek();
      if (topElement != null) {
        try {
          topElement.getSecond().get(sleepTime, sleepUnit);
        } catch (Exception te) {
          failedFutures.add(topElement);
        }
        queue.poll();
      }
    }
  }
}
 
源代码10 项目: beast   文件: OffsetMapQueueSinkTest.java
@Test
public void shouldPushMessageToQueue() throws InterruptedException {
    BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> queue = new LinkedBlockingQueue<>();
    queueSink = new OffsetMapQueueSink(queue, queueConfig);
    Records messages = new Records(Collections.singletonList(new Record(offsetInfo, new HashMap<>())));

    Status status = queueSink.push(messages);

    assertTrue(status.isSuccess());
    assertEquals(1, queue.size());
    Map<TopicPartition, OffsetAndMetadata> partitionsCommitOffset = queue.take();
    assertEquals(1, partitionsCommitOffset.size());
    Map.Entry<TopicPartition, OffsetAndMetadata> offset = partitionsCommitOffset.entrySet().iterator().next();
    assertEquals(offset.getKey().topic(), "default-topic");
    assertEquals(offset.getKey().partition(), 0);
    assertEquals(offset.getValue().offset(), 1);
}
 
源代码11 项目: actioncable-client-java   文件: ConnectionTest.java
@Test(timeout = TIMEOUT)
public void shouldFireOnFailureWhenInternalServerErrorReceived() throws InterruptedException, IOException {
    final MockWebServer mockWebServer = new MockWebServer();
    final MockResponse response = new MockResponse();
    response.setResponseCode(500);
    response.setStatus("HTTP/1.1 500 Internal Server Error");
    mockWebServer.enqueue(response);
    mockWebServer.start();

    final URI uri = mockWebServer.url("/").uri();
    final Connection connection = new Connection(uri, new Consumer.Options());

    final BlockingQueue<String> events = new LinkedBlockingQueue<String>();

    connection.setListener(new DefaultConnectionListener() {
        @Override
        public void onFailure(Exception e) {
            events.offer("onFailed");
        }
    });
    connection.open();

    assertThat(events.take(), is("onFailed"));

    mockWebServer.shutdown();
}
 
源代码12 项目: disruptor   文件: PushPullBlockingQueueTest.java
@Test
public void testOffer() {

    final int cap = 16;
    BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
    for(int i=0; i<cap; i++) {
        dbq.offer(Integer.valueOf(i));
    }

    Assert.assertFalse(dbq.offer(Integer.valueOf(cap)));

    for(int i=0; i<cap; i++) {
        Assert.assertEquals(Integer.valueOf(i), dbq.poll());
    }

}
 
源代码13 项目: xio   文件: IpRules.java
public Result remove(IpRule ipRule, BlockingQueue<UpdateMessage> workLoad) {
  try {
    InetAddress address = InetAddress.getByAddress(ipRule.getIpAddress());
    log.debug("address {}", address.getHostAddress());
    if (!rules.containsKey(address)) {
      return new Result(false, "nothing to remove for address " + address.getHostAddress());
    } else {
      workLoad.put(UpdateMessage.removeIpRule(address));
      rules.remove(address);
    }
  } catch (UnknownHostException | InterruptedException e) {
    log.error("addIpRule couldn't add {}", ipRule, e);
    return new Result(false, e.getMessage());
  }
  return new Result(true, "");
}
 
源代码14 项目: nuls   文件: AccountResource.java
@POST
@Path("/lock/{address}")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "[锁账户] 清除缓存的锁定账户", notes = "Clear the cache unlock account.")
public RpcClientResult lock(@ApiParam(name = "address", value = "账户地址", required = true) @PathParam("address") String address) {
    Account account = accountService.getAccount(address).getData();
    if (null == account) {
        return Result.getFailed(AccountErrorCode.ACCOUNT_NOT_EXIST).toRpcClientResult();
    }
    accountCacheService.removeAccount(account.getAddress());
    BlockingQueue<Runnable> queue = scheduler.getQueue();
    String addr = account.getAddress().toString();
    Runnable scheduledFuture = (Runnable) accountUnlockSchedulerMap.get(addr);
    if (queue.contains(scheduledFuture)) {
        scheduler.remove(scheduledFuture);
        accountUnlockSchedulerMap.remove(addr);
    }
    Map<String, Boolean> map = new HashMap<>();
    map.put("value", true);
    return Result.getSuccess().setData(map).toRpcClientResult();
}
 
源代码15 项目: localization_nifi   文件: ListenSyslog.java
protected ChannelDispatcher createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool,
                                                final BlockingQueue<RawSyslogEvent> events, final int maxConnections,
                                                final SSLContextService sslContextService, final Charset charset) throws IOException {

    final EventFactory<RawSyslogEvent> eventFactory = new RawSyslogEventFactory();

    if (UDP_VALUE.getValue().equals(protocol)) {
        return new DatagramChannelDispatcher(eventFactory, bufferPool, events, getLogger());
    } else {
        // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
        SSLContext sslContext = null;
        if (sslContextService != null) {
            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
        }

        final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
        return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, charset);
    }
}
 
源代码16 项目: simple-robot-core   文件: BaseConfiguration.java
/**
 * 获取线程池的阻塞队列
 */
public BlockingQueue<Runnable> getWorkQueue() {
    if (this.workQueue != null) {
        return this.workQueue;
    } else {
        if (this.workQueueFrom == null) {
            return null;
        } else {
            try {
                Class<?> clz = Class.forName(workQueueFrom);
                Object instance = clz.newInstance();
                this.workQueue = (BlockingQueue<Runnable>) instance;
                return this.workQueue;
            } catch (Exception e) {
                throw new ConfigurationException("无法读取包路径'" + workQueueFrom + "'来作为'" + BlockingQueue.class + "'实例。", e);
            }
        }
    }
}
 
源代码17 项目: dawnsci   文件: HDF5File.java
/**
 * Finish all writes (block until it is done)
 */
public synchronized void flushWrites() {
	if (service != null) {
		BlockingQueue<Runnable> queue = service.getQueue();
		final long milli = 10; // period to sleep between checking for empty queue
		while (!service.isTerminated() && queue.peek() != null) {
			try {
				Thread.sleep(milli);
			} catch (InterruptedException e) {
			}
		}
	}
	flushDatasets();
}
 
源代码18 项目: jlogstash-input-plugin   文件: File.java
public void addFile(String fileName){
	int hashCode = Math.abs(fileName.hashCode());
	int index = hashCode % readFileThreadNum;
	BlockingQueue<String> readQueue = threadReadFileMap.get(index);
	
	if(readQueue == null){
		readQueue = new LinkedBlockingQueue<>();
		threadReadFileMap.put(index, readQueue);
	}
	
	readQueue.offer(fileName);
}
 
@Test
public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliveryQueueIsNotEmpty()
        throws InterruptedException {
    FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
    FanOutRecordsPublisher.RecordFlow recordFlow =
            new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001");
    final int[] totalRecordsRetrieved = { 0 };
    BlockingQueue<BatchUniqueIdentifier> ackQueue = new LinkedBlockingQueue<>();
    fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
        @Override public void onSubscribe(Subscription subscription) {}
        @Override public void onNext(RecordsRetrieved recordsRetrieved) {
            totalRecordsRetrieved[0]++;
            // Enqueue the ack for bursty delivery
            ackQueue.add(recordsRetrieved.batchUniqueIdentifier());
            // Send stale event periodically
        }
        @Override public void onError(Throwable throwable) {}
        @Override public void onComplete() {}
    });
    IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
            new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
            recordFlow));
    BatchUniqueIdentifier batchUniqueIdentifierQueued;
    int count = 0;
    // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
    // delivered as expected.
    while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
        final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
        fanOutRecordsPublisher
                .evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal);
        fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
                () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"));
    }
    assertEquals(10, totalRecordsRetrieved[0]);
}
 
源代码20 项目: flink   文件: StreamIterationTail.java
@Override
public void init() throws Exception {

	final String iterationId = getConfiguration().getIterationId();
	if (iterationId == null || iterationId.length() == 0) {
		throw new Exception("Missing iteration ID in the task configuration");
	}

	final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
			getEnvironment().getTaskInfo().getIndexOfThisSubtask());

	final long iterationWaitTime = getConfiguration().getIterationWaitTime();

	LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);

	@SuppressWarnings("unchecked")
	BlockingQueue<StreamRecord<IN>> dataChannel =
			(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);

	LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);

	RecordPusher<IN> headOperator = new RecordPusher<>();
	headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
	this.headOperator = headOperator;

	// call super.init() last because that needs this.headOperator to be set up
	super.init();
}
 
源代码21 项目: datashare   文件: CommonMode.java
private Class<? extends BlockingQueue<String>> getBlockingQueueClassInstance(String className) {
    try {
        return (Class<? extends BlockingQueue<String>>) Class.forName(className);
    } catch (ClassNotFoundException e) {
        throw new RuntimeException(e);
    }
}
 
源代码22 项目: ProjectX   文件: JobExecutor.java
static Executor getExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory, boolean allowCoreThreadTimeOut) {
    JobExecutor singleExecutor = new JobExecutor(corePoolSize, maximumPoolSize,
            keepAliveTime, unit, workQueue, threadFactory);
    if (Build.VERSION.SDK_INT >= 9)
        singleExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
    return singleExecutor;
}
 
TasksAcceptor(
    UnicastProcessor<Task> tasksToProcess,
    ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap,
    ConcurrentMap<String, RSocket> idToRSocketMap) {
  this.tasksToProcess = tasksToProcess;
  this.idToCompletedTasksMap = idToCompletedTasksMap;
  this.idToRSocketMap = idToRSocketMap;
}
 
@Override
public Message poll() {
    // If its null, or we hit the end, reset it.
    if (consumerIdIterator == null || !consumerIdIterator.hasNext()) {
        consumerIdIterator = messageBuffer.keySet().iterator();
    }

    // Try every buffer until we hit the end.
    Message returnMsg = null;
    while (returnMsg == null && consumerIdIterator.hasNext()) {

        // Advance iterator
        final VirtualSpoutIdentifier nextConsumerId = consumerIdIterator.next();

        // Find our buffer
        final BlockingQueue<Message> queue = messageBuffer.get(nextConsumerId);

        // We missed?
        if (queue == null) {
            logger.debug("Non-existent queue found, resetting iterator.");
            consumerIdIterator = messageBuffer.keySet().iterator();
            continue;
        }
        returnMsg = queue.poll();
    }
    return returnMsg;
}
 
源代码25 项目: rya   文件: PeriodicQueryPrunerExecutor.java
public PeriodicQueryPrunerExecutor(final PeriodicQueryResultStorage periodicStorage, final FluoClient client, final int numThreads,
        final BlockingQueue<NodeBin> bins) {
    Preconditions.checkArgument(numThreads > 0);
    this.periodicStorage = periodicStorage;
    this.numThreads = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    this.bins = bins;
    this.client = client;
    this.pruners = new ArrayList<>();
}
 
源代码26 项目: twister2   文件: StreamingSharingExecutor.java
private void schedulerExecution(Map<Integer, INodeInstance> nodes) {
  BlockingQueue<INodeInstance> tasks;
  tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
  tasks.addAll(nodes.values());

  for (INodeInstance node : tasks) {
    node.prepare(config);
  }

  doneSignal = new CountDownLatch(numThreads);
  for (int i = 0; i < numThreads; i++) {
    threads.execute(new StreamWorker(tasks));
  }
}
 
源代码27 项目: rya   文件: LogEventWorkerTest.java
@Test
public void notify_logDeleted_doesNotExist() throws Exception {
    // The signal that will kill the working thread.
    final AtomicBoolean shutdownSignal = new AtomicBoolean(false);

    // The queue used to feed work.
    final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);

    // The queue work is written to.
    final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);

    // Start the worker that will be tested.
    final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
            queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
    logEventWorker.start();

    try {
        // Write a unit of work that indicates a log was deleted. Since it was never created,
        // this will not cause anything to be written to the QueryEvent queue.
        logEventQueue.offer(LogEvent.delete("rya"));

        // Show that a single unit of work was created for deleting everything for "rya".
        assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
    } finally {
        shutdownSignal.set(true);
        logEventWorker.join();
    }
}
 
源代码28 项目: disruptor   文件: MPMCBlockingQueueTest.java
@Test
public void testAsSynchronousQueue() {
    final int cap = 1;
    BlockingQueue<Integer> dbq = new MPMCBlockingQueue<>(cap);
    while(dbq.offer(Integer.valueOf(0)));

    Assert.assertFalse(dbq.offer(Integer.valueOf(10)));

    Assert.assertEquals(2, dbq.size());

    Assert.assertEquals(Integer.valueOf(0), dbq.poll());
}
 
源代码29 项目: tomee   文件: EJBHomeProxyHandle.java
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
    // write out the version of the serialized data for future use
    out.writeByte(2);

    final boolean hasExec = handler.executor != null && handler.executor != JNDIContext.globalExecutor();
    out.writeBoolean(hasExec);
    if (hasExec) {
        out.writeInt(handler.executor.getMaximumPoolSize());
        final BlockingQueue<Runnable> queue = handler.executor.getQueue();
        out.writeInt(queue.size() + queue.remainingCapacity());
    }

    handler.client.setMetaData(metaData);
    handler.client.writeExternal(out);

    final EJBMetaDataImpl ejb = handler.ejb;
    out.writeObject(ejb.homeClass);
    out.writeObject(ejb.remoteClass);
    out.writeObject(ejb.keyClass);
    out.writeByte(ejb.type);
    out.writeUTF(ejb.deploymentID);
    out.writeShort(ejb.deploymentCode);

    handler.server.setMetaData(metaData);
    handler.server.writeExternal(out);
    ///        out.writeObject( handler.primaryKey );
}
 
ReportingThreadExecutionPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                             BlockingQueue<Runnable> workQueue) {
  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      throw new RuntimeException("Rejected Task - " + r);
    }
  });
}
 
 类所在包
 同包方法