下面列出了怎么用java.util.concurrent.ConcurrentLinkedDeque的API类实例代码及写法,或者点击链接到github查看源代码。
public void run() {
synchronized (channels) {
for (Map.Entry<String, ConcurrentLinkedDeque<Tree>> entry : channels.entrySet()) {
String channel = entry.getKey();
if (!channel.startsWith(nodeID + ':')) {
continue;
}
int i = channel.indexOf(':');
channel = channel.substring(i + 1);
ConcurrentLinkedDeque<Tree> queue = entry.getValue();
while (!queue.isEmpty()) {
Tree message = queue.removeLast();
try {
received(channel, serializer.write(message));
} catch (Exception cause) {
logger.error("Unable to serialize message!", cause);
}
}
}
}
}
public JDBCLogHandler(final HttpHandler next, final String formatString, DataSource dataSource) {
this.next = next;
this.formatString = formatString;
this.dataSource = dataSource;
tableName = "access";
remoteHostField = "remoteHost";
userField = "userName";
timestampField = "timestamp";
virtualHostField = "virtualHost";
methodField = "method";
queryField = "query";
statusField = "status";
bytesField = "bytes";
refererField = "referer";
userAgentField = "userAgent";
this.pendingMessages = new ConcurrentLinkedDeque<>();
}
public void send(CarreraRequest request, MessageQueueSelector messageQueueSelector)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
TopicPublishInfo topicInfo = clusterProducer.getRocketMQProducerByIndex(0).getDefaultMQProducerImpl().getTopicPublishInfoTable().get(request.getTopic());
if (topicInfo == null || !topicInfo.ok()) { //new topic
sendSingleMessage(request);
return;
}
MessageQueue mq = messageQueueSelector.select(topicInfo.getMessageQueueList(), null, request);
request.setMessageQueue(mq);
requestQMap.computeIfAbsent(mq.getBrokerName(), _name -> {
Deque<CarreraRequest> q = new ConcurrentLinkedDeque<>();
addRequestQueue(q, _name, config.getMaxEncodeWorkerForEachBroker());
return q;
}).add(request);
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
/**
* iterator.remove() removes current element
*/
public void testIteratorRemove() {
final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
final Random rng = new Random();
for (int iters = 0; iters < 100; ++iters) {
int max = rng.nextInt(5) + 2;
int split = rng.nextInt(max - 1) + 1;
for (int j = 1; j <= max; ++j)
q.add(new Integer(j));
Iterator it = q.iterator();
for (int j = 1; j <= split; ++j)
assertEquals(it.next(), new Integer(j));
it.remove();
assertEquals(it.next(), new Integer(split + 1));
for (int j = 1; j <= split; ++j)
q.remove(new Integer(j));
it = q.iterator();
for (int j = split + 1; j <= max; ++j) {
assertEquals(it.next(), new Integer(j));
it.remove();
}
assertFalse(it.hasNext());
assertTrue(q.isEmpty());
}
}
public void addTimes(String metricName, long time) {
if (logger.isDebugEnabled()) {
logger.debug("MetricName={}, time={}", metricName, time);
}
int latestCount = conf.getSamples();
Deque<Long> deque = this.records.get(metricName);
if (deque == null) {
if (logger.isInfoEnabled()) {
logger.info("Initial timeoutsHealthIndicator, metricName={}, capacity: {}", metricName, latestCount);
}
deque = new ConcurrentLinkedDeque<>();
}
// Overflow check.
if (deque.size() >= (latestCount - 1)) {
deque.poll(); // Remove first.
}
deque.offer(time);
this.records.put(metricName, deque);
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
/**
* descendingIterator.remove() removes current element
*/
public void testDescendingIteratorRemove() {
final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
final Random rng = new Random();
for (int iters = 0; iters < 100; ++iters) {
int max = rng.nextInt(5) + 2;
int split = rng.nextInt(max - 1) + 1;
for (int j = max; j >= 1; --j)
q.add(new Integer(j));
Iterator it = q.descendingIterator();
for (int j = 1; j <= split; ++j)
assertEquals(it.next(), new Integer(j));
it.remove();
assertEquals(it.next(), new Integer(split + 1));
for (int j = 1; j <= split; ++j)
q.remove(new Integer(j));
it = q.descendingIterator();
for (int j = split + 1; j <= max; ++j) {
assertEquals(it.next(), new Integer(j));
it.remove();
}
assertFalse(it.hasNext());
assertTrue(q.isEmpty());
}
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
public JDBCLogHandler(final HttpHandler next, final String formatString, DataSource dataSource) {
this.next = next;
this.formatString = formatString;
this.dataSource = dataSource;
tableName = "access";
remoteHostField = "remoteHost";
userField = "userName";
timestampField = "timestamp";
virtualHostField = "virtualHost";
methodField = "method";
queryField = "query";
statusField = "status";
bytesField = "bytes";
refererField = "referer";
userAgentField = "userAgent";
this.pendingMessages = new ConcurrentLinkedDeque<>();
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
/**
* element() returns first element, or throws NSEE if empty
*/
public void testElement() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertEquals(i, q.element());
assertEquals(i, q.poll());
}
try {
q.element();
shouldThrow();
} catch (NoSuchElementException success) {}
}
/**
* offerFirst(x) succeeds
*/
public void testOfferFirst() {
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
assertTrue(q.offerFirst(zero));
assertTrue(q.offerFirst(one));
assertSame(one, q.peekFirst());
assertSame(zero, q.peekLast());
}
private void appendUncommittedEvent(IDomainEvent<TAggregateRootId> domainEvent) {
if (uncommittedEvents == null) {
uncommittedEvents = new ConcurrentLinkedDeque<>();
}
if (uncommittedEvents.stream().anyMatch(x -> x.getClass().equals(domainEvent.getClass()))) {
throw new UnsupportedOperationException(String.format("Cannot apply duplicated domain event type: %s, current aggregateRoot type: %s, id: %s", domainEvent.getClass(), this.getClass().getName(), id));
}
uncommittedEvents.add(domainEvent);
}
/**
* poll() succeeds unless empty
*/
public void testPoll() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertEquals(i, q.poll());
}
assertNull(q.poll());
}
/**
* isEmpty is true before add, false after
*/
public void testEmpty() {
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
assertTrue(q.isEmpty());
q.add(one);
assertFalse(q.isEmpty());
q.add(two);
q.remove();
q.remove();
assertTrue(q.isEmpty());
}
/**
* peekLast() returns next element, or null if empty
*/
public void testPeekLast() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
for (int i = SIZE - 1; i >= 0; --i) {
assertEquals(i, q.peekLast());
assertEquals(i, q.pollLast());
assertTrue(q.peekLast() == null ||
!q.peekLast().equals(i));
}
assertNull(q.peekLast());
}
/**
* Initializing from Collection of null elements throws NPE
*/
public void testConstructor4() {
try {
new ConcurrentLinkedDeque(Arrays.asList(new Integer[SIZE]));
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* offerLast(null) throws NPE
*/
public void testOfferLastNull() {
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
try {
q.offerLast(null);
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* poll() succeeds unless empty
*/
public void testPoll() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertEquals(i, q.poll());
}
assertNull(q.poll());
}
/**
* peekFirst() returns next element, or null if empty
*/
public void testPeekFirst() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertEquals(i, q.peekFirst());
assertEquals(i, q.pollFirst());
assertTrue(q.peekFirst() == null ||
!q.peekFirst().equals(i));
}
assertNull(q.peekFirst());
}
@Test
public void testConcurrentAccess() throws InterruptedException {
Deque<INode> inbound = new ConcurrentLinkedDeque<>();
Deque<INode> outbound = new ConcurrentLinkedDeque<>();
List<Runnable> threads = new ArrayList<>();
// due to the maximum number of threads used, active nodes will not be rejected here
for (int i = 0; i < MAX_ACTIVE_NODES; i++) {
threads.add(generateTempNode());
threads.add(moveTempNodeToOutbound(outbound));
threads.add(moveTempNodeToInbound(inbound));
threads.add(movePeerToActive(inbound, outbound));
}
assertConcurrent("Testing concurrent use of NodeMgr with additions to temp, inbound, outbound and active.", threads, TIME_OUT);
// print the resulting set of active peers
System.out.println(nMgr.dumpNodeInfo("self", true));
assertTrue(nMgr.activeNodesSize() <= MAX_ACTIVE_NODES);
// the following assert can fail, but under normal circumstances the odds are extremely low
// if it fails consistently there is very likely a bug in the node management
assertTrue(nMgr.activeNodesSize() > 0);
// also remove active nodes
for (int i = 0; i < MAX_ACTIVE_NODES; i++) {
threads.add(dropActive());
}
assertConcurrent("Testing concurrent use of NodeMgr use with added deletions.", threads, TIME_OUT);
// print the resulting set of active peers
System.out.println(nMgr.dumpNodeInfo("self", true));
assertTrue(nMgr.activeNodesSize() <= MAX_ACTIVE_NODES);
// the following assert can fail, but under normal circumstances the odds are extremely low
// if it fails consistently there is very likely a bug in the node management
assertTrue(nMgr.activeNodesSize() > 0);
}
@Override
public void addEditSession(EditSession session) {
ConcurrentLinkedDeque<EditSession> tmp = sessions;
if (tmp == null) tmp = new ConcurrentLinkedDeque<>();
tmp.add(session);
this.sessions = tmp;
}
/**
* pollLast() succeeds unless empty
*/
public void testPollLast() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
for (int i = SIZE - 1; i >= 0; --i) {
assertEquals(i, q.pollLast());
}
assertNull(q.pollLast());
}
/**
* add(null) throws NPE
*/
public void testAddNull() {
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
try {
q.add(null);
shouldThrow();
} catch (NullPointerException success) {}
}
static ConcurrentSkipListSet<MeanShiftSeed> doAll(
int maxIter, double[][] X, RadiusNeighbors nbrs,
ConcurrentLinkedDeque<SummaryLite> summaries) {
return getThreadPool().invoke(
new ParallelSeedExecutor(
maxIter, X, nbrs,
summaries));
}