下面列出了java.util.concurrent.ConcurrentLinkedQueue#offer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 添加过期未应答的分区段到过期队列
* (线程安全,由于定时清理过期未应答partitionSegment和监听会话断开事件后转移到过期队列,存在并发场景)
*
* @param consumePartition 消费分区
* @param partitionSegment 分区段
*/
private void addToExpireQueue(ConsumePartition consumePartition, PartitionSegment partitionSegment) {
ConcurrentLinkedQueue<PartitionSegment> queue = expireQueueMap.get(consumePartition);
if (queue == null) {
queue = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<PartitionSegment> pre = expireQueueMap.putIfAbsent(consumePartition, queue);
if (pre != null) {
queue = pre;
}
}
if (queue.contains(partitionSegment)) {
return;
}
queue.offer(partitionSegment);
// 记录下超时未应答队列的情况
long size = queue.size();
logger.debug("add expire queue, partition: {}, size: {}, start: {}, end: {}", partitionSegment.getPartition(), size, partitionSegment.getStartIndex(), partitionSegment.getEndIndex());
logger.debug("expire queue size is:[{}], partitionInfo:[{}], ", size, consumePartition);
if (queue.size() > 10000) {
logger.info("expire queue size is:[{}], partitionInfo:[{}], ", size, consumePartition);
}
}
/**
* check if the connection is not be used for a while & do connection heart beat
*
* @param linkedQueue
* @param hearBeatTime
*/
private void longIdleHeartBeat(ConcurrentLinkedQueue<BackendConnection> linkedQueue, long hearBeatTime) {
long length = linkedQueue.size();
for (int i = 0; i < length; i++) {
BackendConnection con = linkedQueue.poll();
if (con == null) {
break;
} else if (con.isClosed()) {
continue;
} else if (con.getLastTime() < hearBeatTime) { //if the connection is idle for a long time
con.setBorrowed(true);
new ConnectionHeartBeatHandler().doHeartBeat(con);
} else {
linkedQueue.offer(con);
break;
}
}
}
@Test
public void givenProducerOffersElementInQueue_WhenConsumerPollsQueue_ThenItRetrievesElement() throws Exception {
int element = 1;
ExecutorService executorService = Executors.newFixedThreadPool(2);
ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
Runnable offerTask = () -> concurrentLinkedQueue.offer(element);
Callable<Integer> pollTask = () -> {
while (concurrentLinkedQueue.peek() != null) {
return concurrentLinkedQueue.poll()
.intValue();
}
return null;
};
executorService.submit(offerTask);
TimeUnit.SECONDS.sleep(1);
Future<Integer> returnedElement = executorService.submit(pollTask);
assertThat(returnedElement.get()
.intValue(), is(equalTo(element)));
executorService.awaitTermination(1, TimeUnit.SECONDS);
executorService.shutdown();
}
/**
* Creates a new {@link PublisherProcessorSignalsHolder} which holds a maximum of {@code maxBuffer} items without
* being consumed. If more items are {@link PublisherProcessorSignalsHolder#add(Object) added} to the returned
* {@link PublisherProcessorSignalsHolder} then the oldest item previously added to the holder will be dropped.
*
* @param maxBuffer Maximum number of items that can be present in the returned
* @param <T> Type of items added to the returned {@link PublisherProcessorSignalsHolder}.
* @return A new {@link PublisherProcessorSignalsHolder}.
*/
static <T> PublisherProcessorSignalsHolder<T> fixedSizeDropHead(final int maxBuffer) {
return new AbstractPublisherProcessorSignalsHolder<T, ConcurrentLinkedQueue<Object>>(maxBuffer,
new ConcurrentLinkedQueue<>()) {
@Override
void offerPastBufferSize(final Object signal, final ConcurrentLinkedQueue<Object> queue) {
queue.poll(); // drop oldest
// Since the queue is unbounded (ConcurrentLinkedQueue) offer never fails.
queue.offer(signal);
}
};
}
/**
* offer(null) throws NPE
*/
public void testOfferNull() {
ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
try {
q.offer(null);
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* offer(null) throws NPE
*/
public void testOfferNull() {
ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
try {
q.offer(null);
shouldThrow();
} catch (NullPointerException success) {}
}
private static void fillHashtableWithUnlinked(BuildingJobDataProvider provider, String startJob, Hashtable<String, SimpleBuildingJob> converted) {
ConcurrentLinkedQueue<String> toBuild = new ConcurrentLinkedQueue<>();
toBuild.offer(startJob);
while (!toBuild.isEmpty()) {
String currentName = toBuild.poll();
if (!converted.containsKey(currentName)) {
SimpleBuildingJob job = createUnlinkedJob(provider, toBuild, currentName);
converted.put(currentName, job);
}
}
}
@Override
protected boolean queueOffer(ByteBuffer buffer) {
ConcurrentLinkedQueue<ByteBuffer> queue = this.queueArray[ getNextIndex(offerIdx) ];
return queue.offer( buffer );
}
@CalledOnlyBy(AmidstThread.WORKER)
private void loadPlayer(ConcurrentLinkedQueue<Player> players, SaveGamePlayer saveGamePlayer) {
players.offer(new Player(saveGamePlayer.getPlayerInformation(playerInformationProvider), saveGamePlayer));
}
private Void handle(ErrorEvent e, ConcurrentLinkedQueue<ErrorEvent> queue) {
while (queue.size() >= maxSize)
queue.poll();
queue.offer(e);
return null;
}
@CalledOnlyBy(AmidstThread.WORKER)
private void loadPlayer(ConcurrentLinkedQueue<Player> players, SaveGamePlayer saveGamePlayer) {
players.offer(new Player(saveGamePlayer.getPlayerInformation(playerInformationProvider), saveGamePlayer));
}
@Test
void requestInvalidDemand() throws Exception {
final ConcurrentLinkedQueue<NettyDataBuffer> allocatedBuffers = new ConcurrentLinkedQueue<>();
final DataBufferFactoryWrapper<NettyDataBufferFactory> factoryWrapper = new DataBufferFactoryWrapper<>(
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT) {
@Override
public NettyDataBuffer allocateBuffer() {
final NettyDataBuffer buffer = super.allocateBuffer();
allocatedBuffers.offer(buffer);
return buffer;
}
});
final CompletableFuture<HttpResponse> future = new CompletableFuture<>();
final ArmeriaServerHttpResponse response =
new ArmeriaServerHttpResponse(ctx, future, factoryWrapper, null);
response.writeWith(Mono.just(factoryWrapper.delegate().allocateBuffer().write("foo".getBytes())))
.then(Mono.defer(response::setComplete)).subscribe();
await().until(future::isDone);
assertThat(future.isCompletedExceptionally()).isFalse();
final AtomicBoolean completed = new AtomicBoolean();
final AtomicReference<Throwable> error = new AtomicReference<>();
future.get().subscribe(new Subscriber<HttpObject>() {
@Override
public void onSubscribe(Subscription s) {
s.request(0);
}
@Override
public void onNext(HttpObject httpObject) {
// Do nothing.
}
@Override
public void onError(Throwable t) {
error.compareAndSet(null, t);
completed.set(true);
}
@Override
public void onComplete() {
completed.set(true);
}
});
await().untilTrue(completed);
assertThat(error.get()).isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Reactive Streams specification rule 3.9");
await().untilAsserted(() -> {
assertThat(allocatedBuffers).hasSize(1);
assertThat(allocatedBuffers.peek().getNativeBuffer().refCnt()).isZero();
allocatedBuffers.poll();
});
}