类java.util.concurrent.LinkedBlockingDeque源码实例Demo

下面列出了怎么用java.util.concurrent.LinkedBlockingDeque的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hottub   文件: RemovePollRace.java
Collection<Queue<Boolean>> concurrentQueues() {
    List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
    queues.add(new ConcurrentLinkedDeque<Boolean>());
    queues.add(new ConcurrentLinkedQueue<Boolean>());
    queues.add(new ArrayBlockingQueue<Boolean>(count, false));
    queues.add(new ArrayBlockingQueue<Boolean>(count, true));
    queues.add(new LinkedBlockingQueue<Boolean>());
    queues.add(new LinkedBlockingDeque<Boolean>());
    queues.add(new LinkedTransferQueue<Boolean>());

    // Following additional implementations are available from:
    // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
    // queues.add(new SynchronizedLinkedListQueue<Boolean>());

    // Avoid "first fast, second slow" benchmark effect.
    Collections.shuffle(queues);
    return queues;
}
 
源代码2 项目: streams   文件: TwitterStreamProvider.java
@Override
public synchronized StreamsResultSet readCurrent() {

  StreamsResultSet current;
  synchronized (this) {
    Queue<StreamsDatum> drain = new LinkedBlockingDeque<>();
    drainTo(drain);
    current = new StreamsResultSet(drain);
    current.setCounter(new DatumStatusCounter());
    current.getCounter().add(countersCurrent);
    countersTotal.add(countersCurrent);
    countersCurrent = new DatumStatusCounter();
  }

  return current;
}
 
源代码3 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * Descending iterator ordering is reverse FIFO
 */
public void testDescendingIteratorOrdering() {
    final LinkedBlockingDeque q = new LinkedBlockingDeque();
    for (int iters = 0; iters < 100; ++iters) {
        q.add(new Integer(3));
        q.add(new Integer(2));
        q.add(new Integer(1));
        int k = 0;
        for (Iterator it = q.descendingIterator(); it.hasNext();) {
            assertEquals(++k, it.next());
        }

        assertEquals(3, k);
        q.remove();
        q.remove();
        q.remove();
    }
}
 
@Test
@SuppressWarnings("CallToThreadRun")
public void testCleanup() throws IOException {
	TestWriteRequest request = new TestWriteRequest();
	LinkedBlockingDeque<ChannelStateWriteRequest> deque = new LinkedBlockingDeque<>();
	deque.add(request);
	TestRequestDispatcher requestProcessor = new TestRequestDispatcher();
	ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(TASK_NAME, requestProcessor, deque);

	worker.close();
	worker.run();

	assertTrue(requestProcessor.isStopped());
	assertTrue(deque.isEmpty());
	assertTrue(request.isCancelled());
}
 
源代码5 项目: openjdk-8-source   文件: RemovePollRace.java
Collection<Queue<Boolean>> concurrentQueues() {
    List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
    queues.add(new ConcurrentLinkedDeque<Boolean>());
    queues.add(new ConcurrentLinkedQueue<Boolean>());
    queues.add(new ArrayBlockingQueue<Boolean>(count, false));
    queues.add(new ArrayBlockingQueue<Boolean>(count, true));
    queues.add(new LinkedBlockingQueue<Boolean>());
    queues.add(new LinkedBlockingDeque<Boolean>());
    queues.add(new LinkedTransferQueue<Boolean>());

    // Following additional implementations are available from:
    // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
    // queues.add(new SynchronizedLinkedListQueue<Boolean>());

    // Avoid "first fast, second slow" benchmark effect.
    Collections.shuffle(queues);
    return queues;
}
 
源代码6 项目: wildfly-core   文件: JConsoleCLIPlugin.java
private ExecutorService createExecutor() {
    final ThreadGroup group = new ThreadGroup("management-client-thread");
    final ThreadFactory threadFactory = doPrivileged(new PrivilegedAction<JBossThreadFactory>() {
        public JBossThreadFactory run() {
            return new JBossThreadFactory(group, Boolean.FALSE, null, "%G " + executorCount.incrementAndGet() + "-%t", null, null);
        }
    });
    return EnhancedQueueExecutor.DISABLE_HINT ?
        new ThreadPoolExecutor(2, DEFAULT_MAX_THREADS, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), threadFactory) :
        new EnhancedQueueExecutor.Builder()
            .setCorePoolSize(2)
            .setMaximumPoolSize(DEFAULT_MAX_THREADS)
            .setKeepAliveTime(60, TimeUnit.SECONDS)
            .setThreadFactory(threadFactory)
            .build();
}
 
源代码7 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * iterator iterates through all elements
 */
public void testIterator() throws InterruptedException {
    LinkedBlockingDeque q = populatedDeque(SIZE);
    Iterator it = q.iterator();
    int i;
    for (i = 0; it.hasNext(); i++)
        assertTrue(q.contains(it.next()));
    assertEquals(i, SIZE);
    assertIteratorExhausted(it);

    it = q.iterator();
    for (i = 0; it.hasNext(); i++)
        assertEquals(it.next(), q.take());
    assertEquals(i, SIZE);
    assertIteratorExhausted(it);
}
 
源代码8 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * drainTo(c) empties deque into another collection c
 */
public void testDrainTo() {
    LinkedBlockingDeque q = populatedDeque(SIZE);
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(SIZE, l.size());
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    q.add(zero);
    q.add(one);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(zero));
    assertTrue(q.contains(one));
    l.clear();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(2, l.size());
    for (int i = 0; i < 2; ++i)
        assertEquals(l.get(i), new Integer(i));
}
 
private void registerTransactionListener(String beanName, Object bean) {
    Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

    if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());
    }
    RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);
    RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());
    if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {
        throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");
    }
    ((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),
        annotation.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(annotation.blockingQueueSize())));
    ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
    log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());
}
 
源代码10 项目: SpringMVC-Project   文件: ChatController.java
@Override
public void run() {
    LinkedBlockingDeque<MessageVo> messageQueue = MESSAGE_QUEUE_MAP.get(userId);
    BaseResponse<List<MessageVo>> response = new BaseResponse<>();
    List<MessageVo> list = Lists.newArrayList();

    MessageVo vo;
    try {
        if ((vo = messageQueue.poll(timeout, TimeUnit.MILLISECONDS)) != null) {
            list.add(vo);
            //一次最多取10条信息
            for (int i = 0; i < 9; i++) {
                vo = messageQueue.poll();
                if (vo == null) {
                    break;
                }
                list.add(vo);
            }
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    response.setData(list);
    deferredResult.setResult(response);
}
 
源代码11 项目: linstor-server   文件: DrbdEventService.java
@Inject
public DrbdEventService(
    final ErrorReporter errorReporterRef,
    final DrbdStateTracker trackerRef,
    final CoreModule.ResourceDefinitionMap rscDfnMap
)
{
    try
    {
        instanceName = new ServiceName(INSTANCE_PREFIX + INSTANCE_COUNT.incrementAndGet());
        eventDeque = new LinkedBlockingDeque<>(EVENT_QUEUE_DEFAULT_SIZE);
        demonHandler = new DaemonHandler(eventDeque, DRBDSETUP_COMMAND, "events2", "all");
        running = false;
        errorReporter = errorReporterRef;
        tracker = trackerRef;
        eventsMonitor = new DrbdEventsMonitor(trackerRef, errorReporterRef, rscDfnMap);
    }
    catch (InvalidNameException invalidNameExc)
    {
        throw new ImplementationError(invalidNameExc);
    }
}
 
源代码12 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * offer transfers elements across Executor tasks
 */
public void testOfferInExecutor() {
    final LinkedBlockingDeque q = new LinkedBlockingDeque(2);
    q.add(one);
    q.add(two);
    final CheckedBarrier threadsStarted = new CheckedBarrier(2);
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    try (PoolCleaner cleaner = cleaner(executor)) {
        executor.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                assertFalse(q.offer(three));
                threadsStarted.await();
                assertTrue(q.offer(three, LONG_DELAY_MS, MILLISECONDS));
                assertEquals(0, q.remainingCapacity());
            }});

        executor.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                threadsStarted.await();
                assertSame(one, q.take());
            }});
    }
}
 
源代码13 项目: sofa-ark   文件: ThreadPoolUtils.java
/**
 * Build Queue
 *
 * @param size size of queue
 * @param isPriority whether use priority queue or not
 * @return queue
 */
public static BlockingQueue<Runnable> buildQueue(int size, boolean isPriority) {
    BlockingQueue<Runnable> queue;
    if (size == 0) {
        queue = new SynchronousQueue<>();
    } else {
        if (isPriority) {
            queue = size < 0 ? new PriorityBlockingQueue<Runnable>()
                : new PriorityBlockingQueue<Runnable>(size);
        } else {
            queue = size < 0 ? new LinkedBlockingDeque<Runnable>()
                : new LinkedBlockingDeque<Runnable>(size);
        }
    }
    return queue;
}
 
源代码14 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * timed offerLast times out if full and elements not taken
 */
public void testTimedOfferLast() throws InterruptedException {
    final LinkedBlockingDeque q = new LinkedBlockingDeque(2);
    final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() throws InterruptedException {
            q.putLast(new Object());
            q.putLast(new Object());
            long startTime = System.nanoTime();
            assertFalse(q.offerLast(new Object(), timeoutMillis(), MILLISECONDS));
            assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
            pleaseInterrupt.countDown();
            try {
                q.offerLast(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
                shouldThrow();
            } catch (InterruptedException success) {}
        }});

    await(pleaseInterrupt);
    assertThreadStaysAlive(t);
    t.interrupt();
    awaitTermination(t);
}
 
源代码15 项目: ignite   文件: BlockingQueueTest.java
/**
 * Main method.
 *
 * @param args Parameters.
 * @throws Exception If failed.
 */
public static void main(String[] args) throws Exception {
    for (int i = 0; i < RETRIES; i++) {
        X.println(">>>");
        X.println(">>> Executing single threaded attempt: " + i);
        X.println(">>>");

        testBlockingQueue("single-threaded-linked-queue", new LinkedBlockingQueue<>());
        testBlockingQueue("single-threaded-linked-deque", new LinkedBlockingDeque<>());
        testBlockingQueue("single-threaded-array-queue", new ArrayBlockingQueue<>(CNT + 10));
    }

    for (int i = 0; i < RETRIES; i++) {
        X.println(">>>");
        X.println(">>> Executing multi-threaded attempt: " + i);
        X.println(">>>");

        testBlockingQueueMultithreaded("multi-threaded-linked-queue", new LinkedBlockingQueue<>());
        testBlockingQueueMultithreaded("multi-threaded-linked-deque", new LinkedBlockingDeque<>());
        testBlockingQueueMultithreaded("multi-threaded-array-queue", new ArrayBlockingQueue<>(
            THREAD_CNT * CNT + 100));
    }
}
 
源代码16 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * poll succeeds unless empty
 */
public void testPoll() {
    LinkedBlockingDeque q = populatedDeque(SIZE);
    for (int i = 0; i < SIZE; ++i) {
        assertEquals(i, q.poll());
    }
    assertNull(q.poll());
}
 
源代码17 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * toArray contains all elements in FIFO order
 */
public void testToArray() throws InterruptedException {
    LinkedBlockingDeque q = populatedDeque(SIZE);
    Object[] o = q.toArray();
    for (int i = 0; i < o.length; i++)
        assertSame(o[i], q.poll());
}
 
源代码18 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * clear removes all elements
 */
public void testClear() {
    LinkedBlockingDeque q = populatedDeque(SIZE);
    q.clear();
    assertTrue(q.isEmpty());
    assertEquals(0, q.size());
    assertEquals(SIZE, q.remainingCapacity());
    q.add(one);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(one));
    q.clear();
    assertTrue(q.isEmpty());
}
 
源代码19 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * Returns a new deque of given size containing consecutive
 * Integers 0 ... n - 1.
 */
private LinkedBlockingDeque<Integer> populatedDeque(int n) {
    LinkedBlockingDeque<Integer> q =
        new LinkedBlockingDeque<Integer>(n);
    assertTrue(q.isEmpty());
    for (int i = 0; i < n; i++)
        assertTrue(q.offer(new Integer(i)));
    assertFalse(q.isEmpty());
    assertEquals(0, q.remainingCapacity());
    assertEquals(n, q.size());
    assertEquals((Integer) 0, q.peekFirst());
    assertEquals((Integer) (n - 1), q.peekLast());
    return q;
}
 
源代码20 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * Deque transitions from empty to full when elements added
 */
public void testEmptyFull() {
    LinkedBlockingDeque q = new LinkedBlockingDeque(2);
    assertTrue(q.isEmpty());
    assertEquals("should have room for 2", 2, q.remainingCapacity());
    q.add(one);
    assertFalse(q.isEmpty());
    q.add(two);
    assertFalse(q.isEmpty());
    assertEquals(0, q.remainingCapacity());
    assertFalse(q.offer(three));
}
 
源代码21 项目: spring-cloud-stream   文件: TestSupportBinder.java
private BlockingQueue<Message<?>> register(MessageChannel channel,
		boolean useNativeEncoding) {
	// we need to add this interceptor to ensure MessageCollector's compatibility
	// with
	// previous versions of SCSt when native encoding is disabled.
	if (!useNativeEncoding) {
		((AbstractMessageChannel) channel)
				.addInterceptor(new InboundMessageConvertingInterceptor());
	}
	LinkedBlockingDeque<Message<?>> result = new LinkedBlockingDeque<>();
	Assert.isTrue(!this.results.containsKey(channel),
			"Channel [" + channel + "] was already bound");
	this.results.put(channel, result);
	return result;
}
 
源代码22 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * Constructor throws IllegalArgumentException if capacity argument nonpositive
 */
public void testConstructor2() {
    try {
        new LinkedBlockingDeque(0);
        shouldThrow();
    } catch (IllegalArgumentException success) {}
}
 
源代码23 项目: samza   文件: TestAzureBlobOutputStream.java
@Before
public void setup() throws Exception {
  threadPool = new ThreadPoolExecutor(1, 1, 60,
      TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());


  mockByteArrayOutputStream = spy(new ByteArrayOutputStream(THRESHOLD));

  mockBlobAsyncClient = PowerMockito.mock(BlockBlobAsyncClient.class);

  when(mockBlobAsyncClient.getBlobUrl()).thenReturn("https://samza.blob.core.windows.net/fake-blob-url");

  mockMetrics = mock(AzureBlobWriterMetrics.class);

  mockCompression = mock(Compression.class);
  doReturn(COMPRESSED_BYTES).when(mockCompression).compress(BYTES);

  BlobMetadataGenerator mockBlobMetadataGenerator = mock(BlobMetadataGenerator.class);
  doAnswer(invocation -> {
    BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, BlobMetadataContext.class);
    String streamName = blobMetadataContext.getStreamName();
    Long blobSize = blobMetadataContext.getBlobSize();
    Long numberOfRecords = blobMetadataContext.getNumberOfMessagesInBlob();
    Map<String, String> metadataProperties = new HashMap<>();
    metadataProperties.put(BLOB_STREAM_NAME_METADATA, streamName);
    metadataProperties.put(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(blobSize));
    metadataProperties.put(BLOB_RECORD_NUMBER_METADATA, Long.toString(numberOfRecords));
    return metadataProperties;
  }).when(mockBlobMetadataGenerator).getBlobMetadata(anyObject());

  azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
      blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
      60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));

  doNothing().when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
  doNothing().when(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
  doNothing().when(azureBlobOutputStream).clearAndMarkClosed();
  doReturn(mockBlobMetadataGenerator).when(azureBlobOutputStream).getBlobMetadataGenerator();
}
 
源代码24 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * peekLast returns element inserted with addLast
 */
public void testAddLast() {
    LinkedBlockingDeque q = populatedDeque(3);
    q.pollLast();
    q.addLast(four);
    assertSame(four, q.peekLast());
}
 
源代码25 项目: bcm-android   文件: Util.java
public static ExecutorService newThreadedExecutor(int coreSize) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());

    executor.execute(new Runnable() {
        @Override
        public void run() {
            Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
        }
    });

    return executor;
}
 
源代码26 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * all elements successfully putLast are contained
 */
public void testPutLast() throws InterruptedException {
    LinkedBlockingDeque q = new LinkedBlockingDeque(SIZE);
    for (int i = 0; i < SIZE; ++i) {
        Integer x = new Integer(i);
        q.putLast(x);
        assertTrue(q.contains(x));
    }
    assertEquals(0, q.remainingCapacity());
}
 
源代码27 项目: sofa-registry   文件: DefaultEventBus.java
/**
 * Instantiates a new Default event bus.
 *
 * @param config the config
 */
public DefaultEventBus(RegistryClientConfig config) {
    this.config = config;
    this.executor = new ThreadPoolExecutor(config.getObserverThreadCoreSize(),
        config.getObserverThreadMaxSize(), 0, TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(config.getObserverThreadQueueLength()),
        new NamedThreadFactory("DefaultEventBusThread"));
}
 
源代码28 项目: nettythrift   文件: SocketConnectionPool.java
/**
 * 
 * @param socketFactory
 *            - 真正负责创建连接的socketFactory--本类专注做连接池,把创建连接的细节抛给外部socketFactory
 * @param maxIdle
 * @param maxTotal
 * @param maxWaitMills
 * @param blockWhenExhausted
 * @param maxIdleTime
 */
public SocketConnectionPool(SocketFactory socketFactory, int maxIdle, int maxTotal, long maxWaitMills,
		boolean blockWhenExhausted, final long maxIdleTime) {
	if (socketFactory instanceof SocketConnectionPool) {
		throw new IllegalArgumentException("socketFactory must not a SocketConnectionPool!");
	}
	this.socketFactory = socketFactory;
	this.maxIdle = maxIdle;
	this.maxTotal = maxTotal;
	this.maxWaitMills = maxWaitMills;
	this.blockWhenExhausted = blockWhenExhausted;

	idleObjects = new LinkedBlockingDeque<>(maxTotal);
	idleCheckTimer.scheduleAtFixedRate(new TimerTask() {
		@Override
		public void run() {
			int size = idleObjects.size();
			if (size < 1) {
				return;
			}
			// toArray() copy 一个副本,避免
			SocketWrapper[] scs = idleObjects.toArray(new SocketWrapper[size]);
			for (int i = 0; i < scs.length; i++) {
				SocketWrapper sc = scs[i];
				if (sc != null && !sc.isWorking && System.currentTimeMillis() - sc.lastUseTime >= maxIdleTime) {
					// System.out.println("try删除空闲太久的连接: " + sc);
					destroy(sc);
				}
			}
		}
	}, 3000, DEFAULT_IDLE_CHECK_GAP);// 检查的开始时间 和 时间间隔--按需调整
}
 
/**
 * Instantiates a new default rejected execution handler.
 *
 * @param synThreadPoolExecutor the syn thread pool executor
 */
public DefaultRejectedExecutionHandler(SynThreadPoolExecutor<SynRunnableIntf> synThreadPoolExecutor)
{
	String threadname = synThreadPoolExecutor.getName() + "-rejected-handler-" + timerSeq.incrementAndGet();
	LinkedBlockingDeque<SynRunnableIntf> deque = new LinkedBlockingDeque<>();
	submitTaskRunnable = new SubmitTaskRunnable<>(deque, synThreadPoolExecutor);
	submitTaskThread = new Thread(submitTaskRunnable, threadname);
	submitTaskThread.start();

	//		myTimerTask = new MyTimerTask(new LinkedBlockingQueue<Runnable>(), synThreadPoolExecutor);
	//		timer.schedule(myTimerTask, 1000);
}
 
源代码30 项目: DDMQ   文件: PullBuffer.java
public boolean addDelayRequest(DelayRequest delayRequest) {
    Deque<DelayRequest> waitQueue = waitQueueMap.computeIfAbsent(
            getRequestTopic(delayRequest.getRequest()), topic -> new LinkedBlockingDeque<>(MAX_WAIT_REQUEST_QUEUE_SIZE));

    if (!waitQueue.offer(delayRequest)) {
        doCleanWaitQueue(waitQueue);
        return waitQueue.offer(delayRequest);
    } else {
        return true;
    }
}
 
 类所在包
 同包方法