下面列出了java.util.concurrent.ConcurrentLinkedQueue#isEmpty ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 统一添加订阅
* @param list 订阅list
* @param onInfo 订阅信息
* @param symbol 订阅币对
*/
private <T> Okexv3WebSocketClient addInfo(ConcurrentLinkedQueue<WebSocketInfo<T>> list,
WebSocketInfo<T> onInfo, String symbol) {
if (!list.isEmpty()) {
// 该币对已经有订阅, 加入订阅队列即可
list.add(onInfo);
} else { // 无订阅
list.add(onInfo);
// 取出这个币对的连接
Okexv3WebSocketClient client = Okexv3WebSocketExchange.CLIENTS.get(symbol);
if (!exist(client)) {
// 该币对无连接,新建一个
client = new Okexv3WebSocketClient(symbol, super.log);
Okexv3WebSocketExchange.CLIENTS.put(symbol, client);
}
// 对这个连接订阅ticker
return client;
}
return null;
}
private IOContext getIOContext() {
IOContext ioContext = null;
final ConcurrentLinkedQueue<IOContext> recyclerQueue = mRecyclers;
if (recyclerQueue != null && !recyclerQueue.isEmpty()) {
ioContext = recyclerQueue.poll();
}
if (ioContext == null) {
ioContext = new IOContext();
}
ioContext.state = IOContext.IDLE;
ioContext.ioRecord.reset();
return ioContext;
}
@Test
public void throwsAnExceptionIfCalledFromMultipleThreads() throws InterruptedException {
final NumGen numGen = new NumGen(123456);
numGen.nextByte();
final ConcurrentLinkedQueue<String> results = new ConcurrentLinkedQueue<>();
new Thread(() -> {
try {
numGen.nextByte();
} catch (RuntimeException e) {
results.add(e.getMessage());
return;
}
results.add("no exception");
}).start();
while (results.isEmpty())
Thread.sleep(10);
assertTrue(results.peek().startsWith("RanGen accessed from multiple threads"));
}
@Test
public void concurrentExecution() throws InterruptedException {
int nThreads = Runtime.getRuntime().availableProcessors();
int resultsPerThread = 10;
ConcurrentLinkedQueue<String> errors = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(1);
int expectedResultCount = nThreads * resultsPerThread;
Listener listener = new Listener(errors, latch);
Consumer<CalculationResults> wrapper =
new ListenerWrapper(listener, expectedResultCount, ImmutableList.of(), ImmutableList.of());
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
CalculationResult result = CalculationResult.of(0, 0, Result.failure(FailureReason.ERROR, "foo"));
CalculationTarget target = new CalculationTarget() {};
CalculationResults results = CalculationResults.of(target, ImmutableList.of(result));
IntStream.range(0, expectedResultCount).forEach(i -> executor.submit(() -> wrapper.accept(results)));
latch.await();
executor.shutdown();
if (!errors.isEmpty()) {
String allErrors = errors.stream().collect(joining("\n"));
fail(allErrors);
}
}
/**
* 合并commit 位置
* ----------------------| |--------------------| |--------------------|
* node1.isCommit = true | -> | node2.isCommit=true| -> |node3.isCommit=false| ...
* ----------------------| |--------------------| |--------------------|
* <p>
* then the result remove 1node leave 2node... behind
*/
private void truncLogPosQueue(ConcurrentLinkedQueue<LogPosition> logPositions) {
if (logPositions.isEmpty()) {
LogUtils.warn.warn("no binlog position object in queue");
return;
}
// 根据concurrent list 实现方式 一次size 相当于 直接遍历一遍 链表
LogPosition curr = null;
LogPosition pre = null;
LinkedList<LogPosition> rms = new LinkedList<LogPosition>();
Iterator<LogPosition> iterator = logPositions.iterator();
while (iterator.hasNext()) {
if (pre == null) {
pre = iterator.next();
continue;
}
curr = iterator.next();
if (pre.isCommit() && curr.isCommit()) {
rms.add(pre);
}
pre = curr;
}
removeQueueWithLock(logPositions, rms);
// 轻易不要开work日志
debugLogPosition(logPositions);
}
/**
* 尝试从过期未应答队列中获取分区段进行消费
*
* @param consumer 消费者
* @return 过期未应答的分区段
*/
private PartitionSegment pollPartitionSegment(Consumer consumer, List<Short> partitionList) {
for (Short partition : partitionList) {
ConsumePartition consumePartition = new ConsumePartition(consumer.getTopic(), consumer.getApp(), partition);
ConcurrentLinkedQueue<PartitionSegment> partitionSegmentQueue = expireQueueMap.get(consumePartition);
if (partitionSegmentQueue != null && !partitionSegmentQueue.isEmpty()) {
return partitionSegmentQueue.poll();
}
}
// 获取不到,则返回空
return null;
}
/**
* Call to initiate work
*/
synchronized public void initiateWork()
{
ConcurrentLinkedQueue<EmojiWorker> taskList = getTaskList();
if (!taskList.isEmpty())
{
currentWorker = taskList.poll();
setLocation(getParent().getLocation().x + (getParent().getWidth() - getWidth()) / 2, getParent().getLocation().y + (getParent().getHeight() - getHeight()) / 2);
cancelButton.setEnabled(true);
update(currentWorker.getInitialReport());
currentWorker.execute();
}
}
private static void fillHashtableWithUnlinked(BuildingJobDataProvider provider, String startJob, Hashtable<String, SimpleBuildingJob> converted) {
ConcurrentLinkedQueue<String> toBuild = new ConcurrentLinkedQueue<>();
toBuild.offer(startJob);
while (!toBuild.isEmpty()) {
String currentName = toBuild.poll();
if (!converted.containsKey(currentName)) {
SimpleBuildingJob job = createUnlinkedJob(provider, toBuild, currentName);
converted.put(currentName, job);
}
}
}
@SuppressWarnings("unchecked")
private void setupInputChannels() throws IOException, InterruptedException {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(),
inputChannels[channelIndex]);
}
}
@SuppressWarnings("unchecked")
private void setupInputChannels() throws IOException, InterruptedException {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
if (event instanceof EndOfPartitionEvent) {
inputChannels[channelIndex].setReleased();
}
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(),
inputChannels[channelIndex]);
}
}
public CoreDataStore(CrailConfiguration conf) throws Exception {
CrailConstants.updateConstants(conf);
CrailConstants.printConf();
CrailConstants.verify();
this.bufferCache = BufferCache.createInstance(CrailConstants.CACHE_IMPL);
this.statistics = new CrailStatistics();
//Datanodes
StringTokenizer tokenizer = new StringTokenizer(CrailConstants.STORAGE_TYPES, ",");
LinkedList<StorageClient> dataNodeClients = new LinkedList<StorageClient>();
while (tokenizer.hasMoreTokens()){
String name = tokenizer.nextToken();
StorageClient dataNode = StorageClient.createInstance(name);
dataNode.init(statistics, bufferCache, conf, null);
dataNode.printConf(LOG);
dataNodeClients.add(dataNode);
}
this.datanodeEndpointCache = new EndpointCache(fsId, dataNodeClients);
//Namenode
InetSocketAddress nnAddr = CrailUtils.getNameNodeAddress();
this.rpcClient = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
rpcClient.init(conf, null);
rpcClient.printConf(LOG);
ConcurrentLinkedQueue<InetSocketAddress> namenodeList = CrailUtils.getNameNodeList();
ConcurrentLinkedQueue<RpcConnection> connectionList = new ConcurrentLinkedQueue<RpcConnection>();
while(!namenodeList.isEmpty()){
InetSocketAddress address = namenodeList.poll();
RpcConnection connection = rpcClient.connect(address);
connectionList.add(connection);
}
if (connectionList.size() == 1){
this.rpcConnection = connectionList.poll();
} else {
this.rpcConnection = new RpcDispatcher(connectionList);
}
LOG.info("connected to namenode(s) " + rpcConnection);
//Client
this.fsId = fsCount.getAndIncrement();
this.localClass = CrailUtils.getLocationClass();
this.blockCache = new BlockCache();
this.nextBlockCache = new NextBlockCache();
this.openInputStreams = new ConcurrentHashMap<Long, CoreInputStream>();
this.openOutputStreams = new ConcurrentHashMap<Long, CoreOutputStream>();
this.streamCounter = new AtomicLong(0);
this.isOpen = true;
this.bufferCheckpoint = new BufferCheckpoint();
this.locationMap = new ConcurrentHashMap<String, String>();
CrailUtils.parseMap(CrailConstants.LOCATION_MAP, locationMap);
this.ioStatsIn = new CoreIOStatistics("core/input");
statistics.addProvider(ioStatsIn);
this.ioStatsOut = new CoreIOStatistics("core/output");
statistics.addProvider(ioStatsOut);
this.streamStats = new CoreStreamStatistics();
statistics.addProvider(streamStats);
statistics.addProvider(bufferCache);
statistics.addProvider(datanodeEndpointCache);
}
public CoreFileSystem(CrailConfiguration conf) throws Exception {
CrailConstants.updateConstants(conf);
CrailConstants.printConf();
CrailConstants.verify();
//Datanodes
StringTokenizer tokenizer = new StringTokenizer(CrailConstants.STORAGE_TYPES, ",");
LinkedList<StorageClient> dataNodeClients = new LinkedList<StorageClient>();
while (tokenizer.hasMoreTokens()){
String name = tokenizer.nextToken();
StorageClient dataNode = StorageClient.createInstance(name);
dataNode.init(conf, null);
dataNode.printConf(LOG);
dataNodeClients.add(dataNode);
}
this.datanodeEndpointCache = new EndpointCache(fsId, dataNodeClients);
//Namenode
InetSocketAddress nnAddr = CrailUtils.getNameNodeAddress();
this.rpcClient = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
rpcClient.init(conf, null);
rpcClient.printConf(LOG);
ConcurrentLinkedQueue<InetSocketAddress> namenodeList = CrailUtils.getNameNodeList();
ConcurrentLinkedQueue<RpcConnection> connectionList = new ConcurrentLinkedQueue<RpcConnection>();
while(!namenodeList.isEmpty()){
InetSocketAddress address = namenodeList.poll();
RpcConnection connection = rpcClient.connect(address);
connectionList.add(connection);
}
if (connectionList.size() == 1){
this.rpcConnection = connectionList.poll();
} else {
this.rpcConnection = new RpcDispatcher(connectionList);
}
LOG.info("connected to namenode(s) " + rpcConnection);
//Client
this.fsId = fsCount.getAndIncrement();
this.localClass = CrailUtils.getLocationClass();
this.bufferCache = BufferCache.createInstance(CrailConstants.CACHE_IMPL);
this.blockCache = new BlockCache();
this.nextBlockCache = new NextBlockCache();
this.openInputStreams = new ConcurrentHashMap<Long, CoreInputStream>();
this.openOutputStreams = new ConcurrentHashMap<Long, CoreOutputStream>();
this.streamCounter = new AtomicLong(0);
this.isOpen = true;
this.bufferCheckpoint = new BufferCheckpoint();
this.locationMap = new ConcurrentHashMap<String, String>();
CrailUtils.parseMap(CrailConstants.LOCATION_MAP, locationMap);
this.statistics = new CrailStatistics();
this.ioStatsIn = new CoreIOStatistics("core/input");
statistics.addProvider(ioStatsIn);
this.ioStatsOut = new CoreIOStatistics("core/output");
statistics.addProvider(ioStatsOut);
this.streamStats = new CoreStreamStatistics();
statistics.addProvider(streamStats);
statistics.addProvider(bufferCache);
statistics.addProvider(datanodeEndpointCache);
}
@Test
public void testParserDbDataThreadSafety() throws Throwable {
final int numThreads = 500;
final String uaQuery = "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:40.0) Gecko/20100101 Firefox/40.0";
final CyclicBarrier gate = new CyclicBarrier(numThreads);
final ConcurrentLinkedQueue<Throwable> failures = new ConcurrentLinkedQueue<>();
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
UdgerParser threadParser = new UdgerParser(parserDbData);
try {
gate.await();
for (int j = 0; j < 100; j++) {
UdgerUaResult qr = threadParser.parseUa(uaQuery);
assertEquals(qr.getUa(), "Firefox 40.0");
assertEquals(qr.getOs(), "Windows 10");
assertEquals(qr.getUaFamily(), "Firefox");
}
} catch (Throwable t) {
failures.add(t);
}
}
});
threads[i].start();
}
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
if (!failures.isEmpty()) {
for (Throwable throwable : failures) {
throwable.printStackTrace();
}
fail("Parsing threads failed, see printed exceptions");
}
}
@SuppressWarnings("unchecked")
private void setupInputChannels() {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(bufferConsumer.build(), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
if (event instanceof EndOfPartitionEvent) {
inputChannels[channelIndex].setReleased();
}
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
}
inputGate.setInputChannels(inputChannels);
}
public static void testCacheMultiThreaded(final BlockCache toBeTested,
final int blockSize, final int numThreads, final int numQueries,
final double passingScore) throws Exception {
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
conf);
final AtomicInteger totalQueries = new AtomicInteger();
final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>();
final AtomicInteger hits = new AtomicInteger();
final AtomicInteger miss = new AtomicInteger();
HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
blocksToTest.addAll(Arrays.asList(blocks));
for (int i = 0; i < numThreads; i++) {
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
if (!blocksToTest.isEmpty()) {
HFileBlockPair ourBlock = blocksToTest.poll();
// if we run out of blocks to test, then we should stop the tests.
if (ourBlock == null) {
ctx.setStopFlag(true);
return;
}
toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
false, false, true);
if (retrievedBlock != null) {
assertEquals(ourBlock.block, retrievedBlock);
toBeTested.evictBlock(ourBlock.blockName);
hits.incrementAndGet();
assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
} else {
miss.incrementAndGet();
}
totalQueries.incrementAndGet();
}
}
};
t.setDaemon(true);
ctx.addThread(t);
}
ctx.startThreads();
while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
+ miss.get());
}
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback)
throws IOException, InterruptedException {
ConcurrentLinkedQueue<ThrowableWithExtraContext> errors = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(actions.size());
AsyncTableRegionLocator locator = conn.getRegionLocator(getName());
List<CompletableFuture<R>> futures = table.<R> batch(actions);
for (int i = 0, n = futures.size(); i < n; i++) {
final int index = i;
FutureUtils.addListener(futures.get(i), (r, e) -> {
if (e != null) {
errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
"Error when processing " + actions.get(index)));
if (!ArrayUtils.isEmpty(results)) {
results[index] = e;
}
latch.countDown();
} else {
if (!ArrayUtils.isEmpty(results)) {
results[index] = r;
}
FutureUtils.addListener(locator.getRegionLocation(actions.get(index).getRow()),
(l, le) -> {
if (le != null) {
errors.add(new ThrowableWithExtraContext(le, EnvironmentEdgeManager.currentTime(),
"Error when finding the region for row " +
Bytes.toStringBinary(actions.get(index).getRow())));
} else {
callback.update(l.getRegion().getRegionName(), actions.get(index).getRow(), r);
}
latch.countDown();
});
}
});
}
latch.await();
if (!errors.isEmpty()) {
throw new RetriesExhaustedException(errors.size(),
errors.stream().collect(Collectors.toList()));
}
}
private void cancelAllJobsInQueue(ConcurrentLinkedQueue<Job> jobQueue) {
while (!jobQueue.isEmpty()) {
Objects.requireNonNull(jobQueue.poll()).cancel(Objects.requireNonNull(hardFailure));
}
}
public MessageExt getMessage(String topic) {
MessageExt result = null;
ConcurrentLinkedQueue<MessageExt> messages= map.get(topic);
if (messages!=null && !messages.isEmpty()) {
result = messages.poll();
}
return result;
}
/**
* Check if this emitter has listeners for the specified event.
*
* @param event an event name.
* @return a reference to this object.
*/
public boolean hasListeners(String event) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
return callbacks != null && !callbacks.isEmpty();
}
/**
* Check if this emitter has listeners for the specified event.
*
* @param event an event name.
* @return a reference to this object.
*/
public boolean hasListeners(String event) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
return callbacks != null && !callbacks.isEmpty();
}