下面列出了怎么用java.util.concurrent.BlockingDeque的API类实例代码及写法,或者点击链接到github查看源代码。
public OutputProxy(
final InputStream in,
final BlockingDeque<Event> dequeRef,
final byte delimiterPrm,
final boolean useOutPrm
)
{
dataIn = in;
deque = dequeRef;
delimiter = delimiterPrm;
useOut = useOutPrm;
data = new byte[INIT_DATA_SIZE];
dataPos = 0;
dataLimit = 0;
shutdown = false;
}
/**
* takeFirst() blocks interruptibly when empty
*/
public void testTakeFirstFromEmptyBlocksInterruptibly() {
final BlockingDeque q = new LinkedBlockingDeque();
final CountDownLatch threadStarted = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
try {
q.takeFirst();
shouldThrow();
} catch (InterruptedException success) {}
assertFalse(Thread.interrupted());
}});
await(threadStarted);
assertThreadStaysAlive(t);
t.interrupt();
awaitTermination(t);
}
/**
* takeLast() blocks interruptibly when empty
*/
public void testTakeLastFromEmptyBlocksInterruptibly() {
final BlockingDeque q = new LinkedBlockingDeque();
final CountDownLatch threadStarted = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
try {
q.takeLast();
shouldThrow();
} catch (InterruptedException success) {}
assertFalse(Thread.interrupted());
}});
await(threadStarted);
assertThreadStaysAlive(t);
t.interrupt();
awaitTermination(t);
}
private StreamsBuilder(
int threads,
int tasks,
BiFunction<Input, Integer, Integer> partitioner,
Class<? extends BlockingDeque> queueType,
int queueSize,
long queueTimeout,
Function<Integer, Input> dataSupplierFn,
Predicate<Input> filter,
Function<Input, Output> process,
Function<Output, Boolean> to,
BiConsumer<Input, Integer> post) {
this.threads = threads;
this.tasks = tasks;
this.partitioner = partitioner;
this.queueType = queueType;
this.queueSize = queueSize;
this.queueTimeout = queueTimeout;
this.dataSupplierFn = dataSupplierFn;
this.filter = filter;
this.process = process;
this.to = to;
this.post = post;
}
static void putAllCollections(Map<Class<?>, IntFunction<?>> map)
{
safePut(map, ArrayList.class, ArrayList::new);
safePut(map, HashSet.class, LinkedHashSet::new);
safePut(map, Properties.class, x -> new Properties());
safePut(map, Hashtable.class, Hashtable::new);
safePut(map, Collection.class, ArrayList::new);
safePut(map, Set.class, LinkedHashSet::new);
safePut(map, List.class, ArrayList::new);
safePut(map, SortedSet.class, x -> new TreeSet<>());
safePut(map, Queue.class, x -> new ConcurrentLinkedQueue<>());
safePut(map, Deque.class, x -> new ConcurrentLinkedDeque<>());
safePut(map, BlockingQueue.class, x -> new LinkedBlockingQueue<>());
safePut(map, BlockingDeque.class, x -> new LinkedBlockingDeque<>());
safePut(map, HashMap.class, LinkedHashMap::new);
safePut(map, LinkedHashMap.class, LinkedHashMap::new);
safePut(map, ConcurrentHashMap.class, ConcurrentHashMap::new);
safePut(map, Map.class, LinkedHashMap::new);
safePut(map, ConcurrentMap.class, x -> new ConcurrentSkipListMap<>());
safePut(map, ConcurrentNavigableMap.class, x -> new ConcurrentSkipListMap<>());
safePut(map, SortedMap.class, i -> new TreeMap<>());
}
/**
* Create a (blocking) supplier for the given topic
*
* @param topic
* @return the supplier
*/
public Supplier<String> createBlockingSupplier(final String topic) {
final BlockingDeque<String> queue = getQueue(topic);
return () -> {
while (true) {
try {
final String s = queue.pollFirst(1, TimeUnit.MINUTES);
if (s != null) {
return s;
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
}
/**
* takeFirst() blocks interruptibly when empty
*/
public void testTakeFirstFromEmptyBlocksInterruptibly() {
final BlockingDeque q = new LinkedBlockingDeque();
final CountDownLatch threadStarted = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
try {
q.takeFirst();
shouldThrow();
} catch (InterruptedException success) {}
assertFalse(Thread.interrupted());
}});
await(threadStarted);
assertThreadStaysAlive(t);
t.interrupt();
awaitTermination(t);
}
/**
* takeLast() blocks interruptibly when empty
*/
public void testTakeLastFromEmptyBlocksInterruptibly() {
final BlockingDeque q = new LinkedBlockingDeque();
final CountDownLatch threadStarted = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
try {
q.takeLast();
shouldThrow();
} catch (InterruptedException success) {}
assertFalse(Thread.interrupted());
}});
await(threadStarted);
assertThreadStaysAlive(t);
t.interrupt();
awaitTermination(t);
}
private <T> Collection<T> instantiateCollectionFromInterface(Class<? extends T> collectionType) {
if (List.class.isAssignableFrom(collectionType)) {
return new ArrayList<T>();
} else if (SortedSet.class.isAssignableFrom(collectionType)) {
return new TreeSet<T>();
} else if (Set.class.isAssignableFrom(collectionType)) {
return new LinkedHashSet<T>();
} else if (BlockingDeque.class.isAssignableFrom(collectionType)) {
return new LinkedBlockingDeque<T>();
} else if (Deque.class.isAssignableFrom(collectionType)) {
return new ArrayDeque<T>();
} else if (BlockingQueue.class.isAssignableFrom(collectionType)) {
return new LinkedBlockingDeque<T>();
} else if (Queue.class.isAssignableFrom(collectionType)) {
return new LinkedList<T>();
}
return new ArrayList<T>();
}
public static List<String> getLogs(String clusterName) {
List<String> logs = new LinkedList<>();
BlockingDeque<String> logContainer = getLogDeque(clusterName);
try {
while (!logContainer.isEmpty()) {
String log = logContainer.pollFirst(1, TimeUnit.SECONDS);
if (!Strings.isNullOrEmpty(log)) {
logs.add(log);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return logs;
}
private static BlockingDeque<String> getLogDeque(String clusterName) {
BlockingDeque<String> logDeque = INSTALLATION_LOG.get(clusterName);
if (logDeque == null) {
logDeque = new LinkedBlockingDeque<>();
}
INSTALLATION_LOG.put(clusterName, logDeque);
return logDeque;
}
private PartitionOperation<T> mergeMoreOperation(PartitionOperation<T> op,
BlockingDeque<PartitionOperation<T>> opQueue, int maxRecords) {
while (!opQueue.isEmpty() && op.getRecords().size() < maxRecords) {
if (op.getRecords().size() + opQueue.peek().getRecords().size() <= maxRecords) {
PartitionOperation<T> moreOp = opQueue.poll();
op.merge(moreOp);
} else {
break;
}
}
return op;
}
/**
* takeFirst() throws InterruptedException immediately if interrupted
* before waiting
*/
public void testTakeFirstFromEmptyAfterInterrupt() {
final BlockingDeque q = new LinkedBlockingDeque();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
Thread.currentThread().interrupt();
try {
q.takeFirst();
shouldThrow();
} catch (InterruptedException success) {}
assertFalse(Thread.interrupted());
}});
awaitTermination(t);
}
/**
* takeLast() throws InterruptedException immediately if interrupted
* before waiting
*/
public void testTakeLastFromEmptyAfterInterrupt() {
final BlockingDeque q = new LinkedBlockingDeque();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
Thread.currentThread().interrupt();
try {
q.takeLast();
shouldThrow();
} catch (InterruptedException success) {}
assertFalse(Thread.interrupted());
}});
awaitTermination(t);
}
public RawHttpDuplexOptions(RawHttpClient<?> client, Duration pingPeriod,
ScheduledExecutorService pingScheduler,
Supplier<BlockingDeque<MessageSender.Message>> messageQueueFactory) {
this.client = client;
this.pingPeriod = pingPeriod;
this.pingScheduler = pingScheduler;
this.messageQueueFactory = messageQueueFactory;
}
/**
* Build a {@link RawHttpDuplexOptions} instance with the configured options.
*
* @return new instance
*/
public RawHttpDuplexOptions build() {
RawHttpClient<?> client = getOrDefault(this.client, TcpRawHttpClient::new);
Duration pingPeriod = getOrDefault(this.pingPeriod, () -> Duration.ofSeconds(5));
ScheduledExecutorService pinger = getOrDefault(this.pingScheduler, Executors::newSingleThreadScheduledExecutor);
Supplier<BlockingDeque<MessageSender.Message>> messageQueueFactory = getOrDefault(
this.messageQueueFactory, Builder::defaultMessageQueue);
return new RawHttpDuplexOptions(client, pingPeriod, pinger, messageQueueFactory);
}
/**
* Create a blocking consumer for the given topic
*
* @param topic
* @return the consumer
*/
public Consumer<String> createBlockingConsumer(final String topic) {
final BlockingDeque<String> queue = getQueue(topic);
return t -> {
boolean accepted = false;
while (!accepted) {
try {
accepted = queue.offerLast(t, 1, TimeUnit.MINUTES);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
}
private synchronized BlockingDeque<String> getOrCreateQueue(final String topic) {
BlockingDeque<String> queue = queues.get(topic);
if (queue == null) {
queue = new LinkedBlockingDeque<>(queueCapacity);
queues.put(topic, queue);
}
return queue;
}
/**
* takeFirst() throws InterruptedException immediately if interrupted
* before waiting
*/
public void testTakeFirstFromEmptyAfterInterrupt() {
final BlockingDeque q = new LinkedBlockingDeque();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
Thread.currentThread().interrupt();
try {
q.takeFirst();
shouldThrow();
} catch (InterruptedException success) {}
assertFalse(Thread.interrupted());
}});
awaitTermination(t);
}
/**
* takeLast() throws InterruptedException immediately if interrupted
* before waiting
*/
public void testTakeLastFromEmptyAfterInterrupt() {
final BlockingDeque q = new LinkedBlockingDeque();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
Thread.currentThread().interrupt();
try {
q.takeLast();
shouldThrow();
} catch (InterruptedException success) {}
assertFalse(Thread.interrupted());
}});
awaitTermination(t);
}
ChannelStateWriteRequestExecutorImpl(
String taskName,
ChannelStateWriteRequestDispatcher dispatcher,
BlockingDeque<ChannelStateWriteRequest> deque) {
this.taskName = taskName;
this.dispatcher = dispatcher;
this.deque = deque;
this.thread = new Thread(this::run, "Channel state writer " + taskName);
this.thread.setDaemon(true);
}
AggregateContainer(final HeartBeatTriggerManager heartBeatTriggerManager,
final KryoUtils kryoUtils,
final BlockingDeque<byte[]> workerReportsQueue,
final TaskletAggregationRequest taskletAggregationRequest) {
this.heartBeatTriggerManager = heartBeatTriggerManager;
this.kryoUtils = kryoUtils;
this.workerReportsQueue = workerReportsQueue;
this.taskletAggregationRequest = taskletAggregationRequest;
}
public DaemonHandler(final BlockingDeque<Event> dequeRef, final String... command)
{
deque = dequeRef;
processBuilder = new ProcessBuilder(command);
processBuilder.redirectError(Redirect.PIPE);
}
public BlockingDeque<ByteArrayWrapper> getPackableHashQueue() {
return packableHashQueue;
}
public void setPackableHashQueue(BlockingDeque<ByteArrayWrapper> packableHashQueue) {
this.packableHashQueue = packableHashQueue;
}
public BlockingDeque<TransactionNetPO> getUnverifiedQueue() {
return unverifiedQueue;
}
public void setUnverifiedQueue(BlockingDeque<TransactionNetPO> unverifiedQueue) {
this.unverifiedQueue = unverifiedQueue;
}
public BlockingDeque<RpcCacheMessage> getCacheMsgQueue() {
return cacheMsgQueue;
}
public void setCacheMsgQueue(BlockingDeque<RpcCacheMessage> cacheMsgQueue) {
this.cacheMsgQueue = cacheMsgQueue;
}