java.util.concurrent.BlockingDeque源码实例Demo

类java.util.concurrent.BlockingDeque源码实例Demo

下面列出了java.util.concurrent.BlockingDeque 类实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: linstor-server   文件: OutputProxy.java
public OutputProxy(
    final InputStream in,
    final BlockingDeque<Event> dequeRef,
    final byte delimiterPrm,
    final boolean useOutPrm
)
{
    dataIn = in;
    deque = dequeRef;
    delimiter = delimiterPrm;
    useOut = useOutPrm;
    data = new byte[INIT_DATA_SIZE];
    dataPos = 0;
    dataLimit = 0;

    shutdown = false;
}
 
源代码2 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * takeFirst() blocks interruptibly when empty
 */
public void testTakeFirstFromEmptyBlocksInterruptibly() {
    final BlockingDeque q = new LinkedBlockingDeque();
    final CountDownLatch threadStarted = new CountDownLatch(1);
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            threadStarted.countDown();
            try {
                q.takeFirst();
                shouldThrow();
            } catch (InterruptedException success) {}
            assertFalse(Thread.interrupted());
        }});

    await(threadStarted);
    assertThreadStaysAlive(t);
    t.interrupt();
    awaitTermination(t);
}
 
源代码3 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * takeLast() blocks interruptibly when empty
 */
public void testTakeLastFromEmptyBlocksInterruptibly() {
    final BlockingDeque q = new LinkedBlockingDeque();
    final CountDownLatch threadStarted = new CountDownLatch(1);
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            threadStarted.countDown();
            try {
                q.takeLast();
                shouldThrow();
            } catch (InterruptedException success) {}
            assertFalse(Thread.interrupted());
        }});

    await(threadStarted);
    assertThreadStaysAlive(t);
    t.interrupt();
    awaitTermination(t);
}
 
源代码4 项目: replicator   文件: StreamsBuilder.java
private StreamsBuilder(
        int threads,
        int tasks,
        BiFunction<Input, Integer, Integer> partitioner,
        Class<? extends BlockingDeque> queueType,
        int queueSize,
        long queueTimeout,
        Function<Integer, Input> dataSupplierFn,
        Predicate<Input> filter,
        Function<Input, Output> process,
        Function<Output, Boolean> to,
        BiConsumer<Input, Integer> post) {
    this.threads = threads;
    this.tasks = tasks;
    this.partitioner = partitioner;
    this.queueType = queueType;
    this.queueSize = queueSize;
    this.queueTimeout = queueTimeout;
    this.dataSupplierFn = dataSupplierFn;
    this.filter = filter;
    this.process = process;
    this.to = to;
    this.post = post;
}
 
源代码5 项目: Diorite   文件: YamlCollectionCreator.java
static void putAllCollections(Map<Class<?>, IntFunction<?>> map)
{
    safePut(map, ArrayList.class, ArrayList::new);
    safePut(map, HashSet.class, LinkedHashSet::new);
    safePut(map, Properties.class, x -> new Properties());
    safePut(map, Hashtable.class, Hashtable::new);

    safePut(map, Collection.class, ArrayList::new);
    safePut(map, Set.class, LinkedHashSet::new);
    safePut(map, List.class, ArrayList::new);
    safePut(map, SortedSet.class, x -> new TreeSet<>());
    safePut(map, Queue.class, x -> new ConcurrentLinkedQueue<>());
    safePut(map, Deque.class, x -> new ConcurrentLinkedDeque<>());
    safePut(map, BlockingQueue.class, x -> new LinkedBlockingQueue<>());
    safePut(map, BlockingDeque.class, x -> new LinkedBlockingDeque<>());


    safePut(map, HashMap.class, LinkedHashMap::new);
    safePut(map, LinkedHashMap.class, LinkedHashMap::new);
    safePut(map, ConcurrentHashMap.class, ConcurrentHashMap::new);

    safePut(map, Map.class, LinkedHashMap::new);
    safePut(map, ConcurrentMap.class, x -> new ConcurrentSkipListMap<>());
    safePut(map, ConcurrentNavigableMap.class, x -> new ConcurrentSkipListMap<>());
    safePut(map, SortedMap.class, i -> new TreeMap<>());
}
 
源代码6 项目: baleen   文件: SharedMemoryQueueResource.java
/**
 * Create a (blocking) supplier for the given topic
 *
 * @param topic
 * @return the supplier
 */
public Supplier<String> createBlockingSupplier(final String topic) {
  final BlockingDeque<String> queue = getQueue(topic);
  return () -> {
    while (true) {
      try {
        final String s = queue.pollFirst(1, TimeUnit.MINUTES);
        if (s != null) {
          return s;
        }
      } catch (final InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  };
}
 
源代码7 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * takeFirst() blocks interruptibly when empty
 */
public void testTakeFirstFromEmptyBlocksInterruptibly() {
    final BlockingDeque q = new LinkedBlockingDeque();
    final CountDownLatch threadStarted = new CountDownLatch(1);
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            threadStarted.countDown();
            try {
                q.takeFirst();
                shouldThrow();
            } catch (InterruptedException success) {}
            assertFalse(Thread.interrupted());
        }});

    await(threadStarted);
    assertThreadStaysAlive(t);
    t.interrupt();
    awaitTermination(t);
}
 
源代码8 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * takeLast() blocks interruptibly when empty
 */
public void testTakeLastFromEmptyBlocksInterruptibly() {
    final BlockingDeque q = new LinkedBlockingDeque();
    final CountDownLatch threadStarted = new CountDownLatch(1);
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            threadStarted.countDown();
            try {
                q.takeLast();
                shouldThrow();
            } catch (InterruptedException success) {}
            assertFalse(Thread.interrupted());
        }});

    await(threadStarted);
    assertThreadStaysAlive(t);
    t.interrupt();
    awaitTermination(t);
}
 
源代码9 项目: type-parser   文件: DynamicParsers.java
private <T> Collection<T> instantiateCollectionFromInterface(Class<? extends T> collectionType) {
    if (List.class.isAssignableFrom(collectionType)) {
        return new ArrayList<T>();
    } else if (SortedSet.class.isAssignableFrom(collectionType)) {
        return new TreeSet<T>();
    } else if (Set.class.isAssignableFrom(collectionType)) {
        return new LinkedHashSet<T>();
    } else if (BlockingDeque.class.isAssignableFrom(collectionType)) {
        return new LinkedBlockingDeque<T>();
    } else if (Deque.class.isAssignableFrom(collectionType)) {
        return new ArrayDeque<T>();
    } else if (BlockingQueue.class.isAssignableFrom(collectionType)) {
        return new LinkedBlockingDeque<T>();
    } else if (Queue.class.isAssignableFrom(collectionType)) {
        return new LinkedList<T>();
    }
    return new ArrayList<T>();
}
 
源代码10 项目: redis-manager   文件: InstallationLogContainer.java
public static List<String> getLogs(String clusterName) {
    List<String> logs = new LinkedList<>();
    BlockingDeque<String> logContainer = getLogDeque(clusterName);
    try {
        while (!logContainer.isEmpty()) {
            String log = logContainer.pollFirst(1, TimeUnit.SECONDS);
            if (!Strings.isNullOrEmpty(log)) {
                logs.add(log);
            }
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return logs;
}
 
源代码11 项目: redis-manager   文件: InstallationLogContainer.java
private static BlockingDeque<String> getLogDeque(String clusterName) {
    BlockingDeque<String> logDeque = INSTALLATION_LOG.get(clusterName);
    if (logDeque == null) {
        logDeque = new LinkedBlockingDeque<>();
    }
    INSTALLATION_LOG.put(clusterName, logDeque);
    return logDeque;
}
 
源代码12 项目: hermes   文件: DefaultCommitter.java
private PartitionOperation<T> mergeMoreOperation(PartitionOperation<T> op,
      BlockingDeque<PartitionOperation<T>> opQueue, int maxRecords) {

	while (!opQueue.isEmpty() && op.getRecords().size() < maxRecords) {
		if (op.getRecords().size() + opQueue.peek().getRecords().size() <= maxRecords) {
			PartitionOperation<T> moreOp = opQueue.poll();
			op.merge(moreOp);
		} else {
			break;
		}
	}

	return op;
}
 
源代码13 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * takeFirst() throws InterruptedException immediately if interrupted
 * before waiting
 */
public void testTakeFirstFromEmptyAfterInterrupt() {
    final BlockingDeque q = new LinkedBlockingDeque();
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            Thread.currentThread().interrupt();
            try {
                q.takeFirst();
                shouldThrow();
            } catch (InterruptedException success) {}
            assertFalse(Thread.interrupted());
        }});

    awaitTermination(t);
}
 
源代码14 项目: openjdk-jdk9   文件: LinkedBlockingDequeTest.java
/**
 * takeLast() throws InterruptedException immediately if interrupted
 * before waiting
 */
public void testTakeLastFromEmptyAfterInterrupt() {
    final BlockingDeque q = new LinkedBlockingDeque();
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            Thread.currentThread().interrupt();
            try {
                q.takeLast();
                shouldThrow();
            } catch (InterruptedException success) {}
            assertFalse(Thread.interrupted());
        }});

    awaitTermination(t);
}
 
源代码15 项目: rawhttp   文件: RawHttpDuplexOptions.java
public RawHttpDuplexOptions(RawHttpClient<?> client, Duration pingPeriod,
                            ScheduledExecutorService pingScheduler,
                            Supplier<BlockingDeque<MessageSender.Message>> messageQueueFactory) {
    this.client = client;
    this.pingPeriod = pingPeriod;
    this.pingScheduler = pingScheduler;
    this.messageQueueFactory = messageQueueFactory;
}
 
源代码16 项目: rawhttp   文件: RawHttpDuplexOptions.java
/**
 * Build a {@link RawHttpDuplexOptions} instance with the configured options.
 *
 * @return new instance
 */
public RawHttpDuplexOptions build() {
    RawHttpClient<?> client = getOrDefault(this.client, TcpRawHttpClient::new);
    Duration pingPeriod = getOrDefault(this.pingPeriod, () -> Duration.ofSeconds(5));
    ScheduledExecutorService pinger = getOrDefault(this.pingScheduler, Executors::newSingleThreadScheduledExecutor);
    Supplier<BlockingDeque<MessageSender.Message>> messageQueueFactory = getOrDefault(
            this.messageQueueFactory, Builder::defaultMessageQueue);
    return new RawHttpDuplexOptions(client, pingPeriod, pinger, messageQueueFactory);
}
 
源代码17 项目: baleen   文件: SharedMemoryQueueResource.java
/**
 * Create a blocking consumer for the given topic
 *
 * @param topic
 * @return the consumer
 */
public Consumer<String> createBlockingConsumer(final String topic) {
  final BlockingDeque<String> queue = getQueue(topic);
  return t -> {
    boolean accepted = false;
    while (!accepted) {
      try {
        accepted = queue.offerLast(t, 1, TimeUnit.MINUTES);
      } catch (final InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  };
}
 
源代码18 项目: baleen   文件: SharedMemoryQueueResource.java
private synchronized BlockingDeque<String> getOrCreateQueue(final String topic) {
  BlockingDeque<String> queue = queues.get(topic);
  if (queue == null) {
    queue = new LinkedBlockingDeque<>(queueCapacity);
    queues.put(topic, queue);
  }
  return queue;
}
 
源代码19 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * takeFirst() throws InterruptedException immediately if interrupted
 * before waiting
 */
public void testTakeFirstFromEmptyAfterInterrupt() {
    final BlockingDeque q = new LinkedBlockingDeque();
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            Thread.currentThread().interrupt();
            try {
                q.takeFirst();
                shouldThrow();
            } catch (InterruptedException success) {}
            assertFalse(Thread.interrupted());
        }});

    awaitTermination(t);
}
 
源代码20 项目: j2objc   文件: LinkedBlockingDequeTest.java
/**
 * takeLast() throws InterruptedException immediately if interrupted
 * before waiting
 */
public void testTakeLastFromEmptyAfterInterrupt() {
    final BlockingDeque q = new LinkedBlockingDeque();
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            Thread.currentThread().interrupt();
            try {
                q.takeLast();
                shouldThrow();
            } catch (InterruptedException success) {}
            assertFalse(Thread.interrupted());
        }});

    awaitTermination(t);
}
 
ChannelStateWriteRequestExecutorImpl(
		String taskName,
		ChannelStateWriteRequestDispatcher dispatcher,
		BlockingDeque<ChannelStateWriteRequest> deque) {
	this.taskName = taskName;
	this.dispatcher = dispatcher;
	this.deque = deque;
	this.thread = new Thread(this::run, "Channel state writer " + taskName);
	this.thread.setDaemon(true);
}
 
源代码22 项目: reef   文件: AggregateContainer.java
AggregateContainer(final HeartBeatTriggerManager heartBeatTriggerManager,
                   final KryoUtils kryoUtils,
                   final BlockingDeque<byte[]> workerReportsQueue,
                   final TaskletAggregationRequest taskletAggregationRequest) {
  this.heartBeatTriggerManager = heartBeatTriggerManager;
  this.kryoUtils = kryoUtils;
  this.workerReportsQueue = workerReportsQueue;
  this.taskletAggregationRequest = taskletAggregationRequest;
}
 
源代码23 项目: linstor-server   文件: DaemonHandler.java
public DaemonHandler(final BlockingDeque<Event> dequeRef, final String... command)
{
    deque = dequeRef;
    processBuilder = new ProcessBuilder(command);
    processBuilder.redirectError(Redirect.PIPE);
}
 
源代码24 项目: nuls-v2   文件: Chain.java
public BlockingDeque<ByteArrayWrapper> getPackableHashQueue() {
    return packableHashQueue;
}
 
源代码25 项目: nuls-v2   文件: Chain.java
public void setPackableHashQueue(BlockingDeque<ByteArrayWrapper> packableHashQueue) {
    this.packableHashQueue = packableHashQueue;
}
 
源代码26 项目: nuls-v2   文件: Chain.java
public BlockingDeque<TransactionNetPO> getUnverifiedQueue() {
    return unverifiedQueue;
}
 
源代码27 项目: nuls-v2   文件: Chain.java
public void setUnverifiedQueue(BlockingDeque<TransactionNetPO> unverifiedQueue) {
    this.unverifiedQueue = unverifiedQueue;
}
 
源代码28 项目: nuls-v2   文件: NodeGroup.java
public BlockingDeque<RpcCacheMessage> getCacheMsgQueue() {
    return cacheMsgQueue;
}
 
源代码29 项目: nuls-v2   文件: NodeGroup.java
public void setCacheMsgQueue(BlockingDeque<RpcCacheMessage> cacheMsgQueue) {
    this.cacheMsgQueue = cacheMsgQueue;
}
 
源代码30 项目: nuls-v2   文件: Node.java
public BlockingDeque<PeerCacheMessage> getCacheSendMsgQueue() {
    return cacheSendMsgQueue;
}
 
源代码评论
动弹
沙发等你来抢
 类所在包
 类方法
 同包方法