下面列出了java.util.concurrent.ConcurrentLinkedDeque#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/** run with -XX:+UseG1GC -Xms256m -Xmx256m */
public static void main(String[] args) throws Exception {
int numberOfNodes = 2000000;
int backpressureAppliedCount = 0;
// TODO find the best list/map impl: concurrently adding/iterating/removing elements
ConcurrentLinkedDeque<NodeRef> nodeRefs = new ConcurrentLinkedDeque<>();
new ReferenceManager(nodeRefs);
for (long id = 0; id < numberOfNodes; id++) {
if (instancesAwaitingSerialization > 0) {
// TODO measure how often backpressure was applied
Thread.sleep(100); //apply some backpressure - this must be longer than the average time to serialize *one* instance
backpressureAppliedCount++;
}
byte[] data = new byte[1024 * 2];
Node node = new Node(id, data);
nodeRefs.add(new NodeRef(node));
long count = id + 1;
if (count % 10000 == 0) {
System.out.println("lastSerializedTime " + count + " nodes; backpressureAppliedCount=" + backpressureAppliedCount);
Thread.sleep(100);
}
}
}
/**
* iterator.remove() removes current element
*/
public void testIteratorRemove() {
final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
final Random rng = new Random();
for (int iters = 0; iters < 100; ++iters) {
int max = rng.nextInt(5) + 2;
int split = rng.nextInt(max - 1) + 1;
for (int j = 1; j <= max; ++j)
q.add(new Integer(j));
Iterator it = q.iterator();
for (int j = 1; j <= split; ++j)
assertEquals(it.next(), new Integer(j));
it.remove();
assertEquals(it.next(), new Integer(split + 1));
for (int j = 1; j <= split; ++j)
q.remove(new Integer(j));
it = q.iterator();
for (int j = split + 1; j <= max; ++j) {
assertEquals(it.next(), new Integer(j));
it.remove();
}
assertFalse(it.hasNext());
assertTrue(q.isEmpty());
}
}
/**
* Descending iterator ordering is reverse FIFO
*/
public void testDescendingIteratorOrdering() {
final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
for (int iters = 0; iters < 100; ++iters) {
q.add(new Integer(3));
q.add(new Integer(2));
q.add(new Integer(1));
int k = 0;
for (Iterator it = q.descendingIterator(); it.hasNext();) {
assertEquals(++k, it.next());
}
assertEquals(3, k);
q.remove();
q.remove();
q.remove();
}
}
/**
* iterator.remove() removes current element
*/
public void testIteratorRemove() {
final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
final Random rng = new Random();
for (int iters = 0; iters < 100; ++iters) {
int max = rng.nextInt(5) + 2;
int split = rng.nextInt(max - 1) + 1;
for (int j = 1; j <= max; ++j)
q.add(new Integer(j));
Iterator it = q.iterator();
for (int j = 1; j <= split; ++j)
assertEquals(it.next(), new Integer(j));
it.remove();
assertEquals(it.next(), new Integer(split + 1));
for (int j = 1; j <= split; ++j)
q.remove(new Integer(j));
it = q.iterator();
for (int j = split + 1; j <= max; ++j) {
assertEquals(it.next(), new Integer(j));
it.remove();
}
assertFalse(it.hasNext());
assertTrue(q.isEmpty());
}
}
/**
* Descending iterator ordering is reverse FIFO
*/
public void testDescendingIteratorOrdering() {
final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
for (int iters = 0; iters < 100; ++iters) {
q.add(new Integer(3));
q.add(new Integer(2));
q.add(new Integer(1));
int k = 0;
for (Iterator it = q.descendingIterator(); it.hasNext();) {
assertEquals(++k, it.next());
}
assertEquals(3, k);
q.remove();
q.remove();
q.remove();
}
}
public synchronized void addData(String table, String cols, DataRow data){
String key = table + "(" + cols +")";
ConcurrentLinkedDeque<DataRow> rows = map.get(key);
if(null == rows){
rows = new ConcurrentLinkedDeque<DataRow>();
map.put(key, rows);
}
rows.add(data);
}
/**
* 接受网络传过来的客户端数据,转发给对应的处理器处理,并返回处理结果
* @param opcode 命令号
* @param clientData 客户端数据
* @param session 会话
* @param <T> 返回值,可以是任意类型
* @return
* @throws Exception
*/
public <T> T handleRequest(int opcode, Object clientData, Session session) throws Exception{
// log.info("request:"+opcode+","+clientData);
RequestHandler handler = handlerMap.get(opcode);
if(handler == null){
throw new MMException("can't find handler of "+opcode);
}
//
monitorService.addMonitorNum(MonitorNumType.RequestNum,1);
// 如果属于加锁失败(事务中)导致的,在这里重新执行,这里只是确保用户访问的事务能够被重新执行
T ret;
int count = 0;
long t1 = System.currentTimeMillis();
while (true) {
try {
ret = handler.handle(opcode, clientData, session);
} catch (MMException e) {
if (e.getExceptionType() == MMException.ExceptionType.TxCommitFail) {
if(count++<2) {
log.warn("----------TxCommitFail ----json---"+opcode);
continue;
}else {
log.error("tx commit fail after 3 times");
throw e;
}
}else{
throw e;
}
}finally {
long t2 = System.currentTimeMillis();
ConcurrentLinkedDeque<Integer> timeList = timeMap.get(opcode);
timeList.add((int)(t2-t1));
requestNum.getAndIncrement();
}
break;
}
// log.info("response:"+opcode+","+ret);
return ret;
}
/**
* Returns true if the block download should continue from the normal download mechanism.
*/
protected synchronized Boolean _onFailure(final Sha256Hash merkleBlockHash) {
try { Thread.sleep(5000L); } catch (final InterruptedException exception) { return false; }
final Long now = _systemTime.getCurrentTimeInMilliSeconds();
ConcurrentLinkedDeque<Long> failedDownloadTimestamps = _failedDownloadTimes.get(merkleBlockHash);
if (failedDownloadTimestamps == null) {
failedDownloadTimestamps = new ConcurrentLinkedDeque<Long>();
_failedDownloadTimes.put(merkleBlockHash, failedDownloadTimestamps);
}
failedDownloadTimestamps.add(now);
int totalFailureCount = 0;
int recentFailureCount = 0;
for (final Long failedTimestamp : failedDownloadTimestamps) {
if (now - failedTimestamp > 30000L) {
recentFailureCount += 1;
}
totalFailureCount += 1;
}
if (recentFailureCount <= 3) {
Logger.debug("Retrying Merkle: " + merkleBlockHash);
_requestMerkleBlock(merkleBlockHash);
return false;
}
if (totalFailureCount <= 21) {
// TODO: Does sequential-ness matter?
// Add the block to the back of the stack and try again later...
Logger.debug("Re-Queueing Merkle for Download: " + merkleBlockHash);
_queuedBlockHashes.add(merkleBlockHash);
}
return true;
}
private PoolSegment createByteBufferPoolSegment(int byteBufferSize, int count) {
ConcurrentLinkedDeque<ByteBufferWrapper> deq = new ConcurrentLinkedDeque<>();
PoolSegment poolSegment = new PoolSegment(count, deq);
for(int i = 0; i < count; i++) {
deq.add(new ByteBufferWrapper(ByteBuffer.allocate(byteBufferSize), poolSegment));
}
return new PoolSegment(byteBufferSize, deq);
}
/**
* size() changes when elements added and removed
*/
public void testSize() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertEquals(SIZE - i, q.size());
q.remove();
}
for (int i = 0; i < SIZE; ++i) {
assertEquals(i, q.size());
q.add(new Integer(i));
}
}
/**
* Modifications do not cause iterators to fail
*/
public void testWeaklyConsistentIteration() {
final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
q.add(one);
q.add(two);
q.add(three);
for (Iterator it = q.iterator(); it.hasNext();) {
q.remove();
it.next();
}
assertEquals("deque should be empty again", 0, q.size());
}
/**
* add(null) throws NPE
*/
public void testAddNull() {
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
try {
q.add(null);
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* clear() removes all elements
*/
public void testClear() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
q.clear();
assertTrue(q.isEmpty());
assertEquals(0, q.size());
q.add(one);
assertFalse(q.isEmpty());
q.clear();
assertTrue(q.isEmpty());
}
/**
* Iterator ordering is FIFO
*/
public void testIteratorOrdering() {
final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
q.add(one);
q.add(two);
q.add(three);
int k = 0;
for (Iterator it = q.iterator(); it.hasNext();) {
assertEquals(++k, it.next());
}
assertEquals(3, k);
}
/**
* clear() removes all elements
*/
public void testClear() {
ConcurrentLinkedDeque q = populatedDeque(SIZE);
q.clear();
assertTrue(q.isEmpty());
assertEquals(0, q.size());
q.add(one);
assertFalse(q.isEmpty());
q.clear();
assertTrue(q.isEmpty());
}
public void addObserver(NotificationObserver observer, int notificationId) {
synchronized (observers) {
if (observers.containsKey(notificationId)) {
if (!observers.get(notificationId).contains(observer)) {
observers.get(notificationId).add(observer);
}
} else {
final ConcurrentLinkedDeque<NotificationObserver> newObservers = new ConcurrentLinkedDeque<>();
newObservers.add(observer);
observers.put(notificationId, newObservers);
}
}
}
private <K, V> void testCodec(TestContext ctx,
String prefix,
Function<Properties,KafkaWriteStream<K, V>> producerFactory,
Function<Properties, KafkaReadStream<K, V>> consumerFactory,
Function<Integer, K> keyConv,
Function<Integer, V> valueConv) throws Exception {
Properties producerConfig = kafkaCluster.useTo().getProducerProperties(prefix+"the_producer");
KafkaWriteStream<K, V> writeStream = producerFactory.apply(producerConfig);
producer = writeStream;
writeStream.exceptionHandler(ctx::fail);
int numMessages = 100000;
ConcurrentLinkedDeque<K> keys = new ConcurrentLinkedDeque<K>();
ConcurrentLinkedDeque<V> values = new ConcurrentLinkedDeque<V>();
for (int i = 0;i < numMessages;i++) {
K key = keyConv.apply(i);
V value = valueConv.apply(i);
keys.add(key);
values.add(value);
writeStream.write(new ProducerRecord<>(prefix + topic, 0, key, value));
}
Async done = ctx.async();
Properties consumerConfig = kafkaCluster.useTo().getConsumerProperties(prefix+"the_consumer", prefix+"the_consumer", OffsetResetStrategy.EARLIEST);
KafkaReadStream<K, V> readStream = consumerFactory.apply(consumerConfig);
consumer = readStream;
AtomicInteger count = new AtomicInteger(numMessages);
readStream.exceptionHandler(ctx::fail);
readStream.handler(rec -> {
ctx.assertEquals(keys.pop(), rec.key());
ctx.assertEquals(values.pop(), rec.value());
if (count.decrementAndGet() == 0) {
done.complete();
}
});
readStream.subscribe(Collections.singleton(prefix + topic));
}
private static void getImagesFilesToHandle(webpifier.Configuration configuration,
ConcurrentLinkedDeque<File> filesToScan, ArrayList<String> result) {
// ArrayList<File> drawableFolders = new ArrayList<>();
while (!filesToScan.isEmpty()) {
File file = filesToScan.removeFirst();
String fileName = file.getName();
// handle images
if (!file.isDirectory()) {
if ((fileName.endsWith(".jpg") || (fileName.endsWith(".png") && !fileName.endsWith(".9.png")))) {
if (!configuration.handleOnlyFilesFromResFolder) {
result.add(file.getAbsolutePath());
continue;
}
File parent = file.getParentFile();
final String name = parent.getName();
if (!parent.isDirectory() || (!name.startsWith("mipmap") && !name.startsWith("drawable")))
continue;
parent = parent.getParentFile();
if (!parent.isDirectory() || !"res".equals(parent.getName()))
continue;
result.add(file.getAbsolutePath());
}
continue;
}
// handle folder
if (configuration.handleOnlyFilesFromResFolder
&& (fileName.startsWith(".") || "build".equals(fileName) || "gradle".equals(fileName)))
continue;
File[] children = file.listFiles();
for (File child : children)
filesToScan.add(child);
}
}
private void onFailure(String errMsg, ProducerJob job, ConcurrentLinkedDeque<ProducerJob> retries) {
log.debug(String.format("OnFailure: will retry job %s.%sReason:%s", job, System.lineSeparator(), errMsg));
retries.add(job);
}
private void rdmaPostWRList(LinkedList<IbvSendWR> sendWRList) throws IOException {
if (isError() || isStopped.get()) {
throw new IOException("QP is in error state, can't post new requests");
}
ConcurrentLinkedDeque<SVCPostSend> stack;
SVCPostSend svcPostSendObject;
int numWrElements = sendWRList.size();
// Special case for 0 sgeElements when rdmaSendWithImm
if (sendWRList.size() == 1 && sendWRList.getFirst().getNum_sge() == 0) {
numWrElements = NOOP_RESERVED_INDEX;
}
stack = svcPostSendCache.computeIfAbsent(numWrElements,
numElements -> new ConcurrentLinkedDeque<>());
// To avoid buffer allocations in disni update cached SVCPostSendObject
if (sendWRList.getFirst().getOpcode() == IbvSendWR.IbvWrOcode.IBV_WR_RDMA_READ.ordinal()
&& (svcPostSendObject = stack.pollFirst()) != null) {
int i = 0;
for (IbvSendWR sendWr: sendWRList) {
SVCPostSend.SendWRMod sendWrMod = svcPostSendObject.getWrMod(i);
sendWrMod.setWr_id(sendWr.getWr_id());
sendWrMod.setSend_flags(sendWr.getSend_flags());
// Setting up RDMA attributes
sendWrMod.getRdmaMod().setRemote_addr(sendWr.getRdma().getRemote_addr());
sendWrMod.getRdmaMod().setRkey(sendWr.getRdma().getRkey());
sendWrMod.getRdmaMod().setReserved(sendWr.getRdma().getReserved());
if (sendWr.getNum_sge() == 1) {
IbvSge sge = sendWr.getSge(0);
sendWrMod.getSgeMod(0).setLkey(sge.getLkey());
sendWrMod.getSgeMod(0).setAddr(sge.getAddr());
sendWrMod.getSgeMod(0).setLength(sge.getLength());
}
i++;
}
} else {
svcPostSendObject = qp.postSend(sendWRList, null);
}
svcPostSendObject.execute();
// Cache SVCPostSend objects only for RDMA Read requests
if (sendWRList.getFirst().getOpcode() == IbvSendWR.IbvWrOcode.IBV_WR_RDMA_READ.ordinal()) {
stack.add(svcPostSendObject);
} else {
svcPostSendObject.free();
}
}