下面列出了java.util.concurrent.ConcurrentLinkedQueue#poll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
@Async
public Future<?> populateTablesAsync( AnalyticsTableUpdateParams params, ConcurrentLinkedQueue<AnalyticsTablePartition> partitions )
{
taskLoop: while ( true )
{
AnalyticsTablePartition partition = partitions.poll();
if ( partition == null )
{
break taskLoop;
}
populateTable( params, partition );
}
return null;
}
/**
*
* @param backendServer
* @return
*/
public static ChannelContext get(BackendServerConf backendServer, ChannelContext frontendChannelContext)
{
ConcurrentLinkedQueue<ChannelContext> queue = queueMap.get(backendServer);
ChannelContext channelContext = queue.poll();
ChannelContext newBackendChannelContext = BackendStarter.addConnection(backendServer, "", 0,
frontendChannelContext, BackendConf.getInstance(), true, 5000);
if (channelContext != null)
{
add(backendServer, newBackendChannelContext);
} else
{
channelContext = newBackendChannelContext;
}
return channelContext;
}
@Override
public void close() throws Exception {
ExecutionState executionState;
for (ConcurrentLinkedQueue<ExecutionState> queue : executionStateQueues.values()) {
while ((executionState = queue.poll()) != null) {
executionState.getWorkExecutor().close();
}
}
executionStateQueues.clear();
}
private void writeMultipleFromQueue(final ConcurrentLinkedQueue<QueueItem> queue) {
final QueueItem item = queue.poll();
if (item != null) {
writeQueueItem(queue, item);
} else {
UserError.Log.d(TAG, "write queue empty");
}
}
@Override
public DRPCRequest fetchRequest(String functionName) throws TException {
ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
DRPCRequest req = queue.poll();
if (req != null) {
LOG.info("Fetched request for " + functionName + " at " + (System.currentTimeMillis()));
return req;
} else {
return new DRPCRequest("", "");
}
}
private static void fillHashtableWithUnlinked(BuildingJobDataProvider provider, String startJob, Hashtable<String, SimpleBuildingJob> converted) {
ConcurrentLinkedQueue<String> toBuild = new ConcurrentLinkedQueue<>();
toBuild.offer(startJob);
while (!toBuild.isEmpty()) {
String currentName = toBuild.poll();
if (!converted.containsKey(currentName)) {
SimpleBuildingJob job = createUnlinkedJob(provider, toBuild, currentName);
converted.put(currentName, job);
}
}
}
/**
* Creates a new {@link PublisherProcessorSignalsHolder} which holds a maximum of {@code maxBuffer} items without
* being consumed. If more items are {@link PublisherProcessorSignalsHolder#add(Object) added} to the returned
* {@link PublisherProcessorSignalsHolder} then the oldest item previously added to the holder will be dropped.
*
* @param maxBuffer Maximum number of items that can be present in the returned
* @param <T> Type of items added to the returned {@link PublisherProcessorSignalsHolder}.
* @return A new {@link PublisherProcessorSignalsHolder}.
*/
static <T> PublisherProcessorSignalsHolder<T> fixedSizeDropHead(final int maxBuffer) {
return new AbstractPublisherProcessorSignalsHolder<T, ConcurrentLinkedQueue<Object>>(maxBuffer,
new ConcurrentLinkedQueue<>()) {
@Override
void offerPastBufferSize(final Object signal, final ConcurrentLinkedQueue<Object> queue) {
queue.poll(); // drop oldest
// Since the queue is unbounded (ConcurrentLinkedQueue) offer never fails.
queue.offer(signal);
}
};
}
/**
* contains(x) reports true when elements added but not yet removed
*/
public void testContains() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertTrue(q.contains(new Integer(i)));
q.poll();
assertFalse(q.contains(new Integer(i)));
}
}
public synchronized ITransport acquire(String alias, Supplier<ITransport> supplier) {
ConcurrentLinkedQueue<ITransport> availableTransports = resources.computeIfAbsent(alias,
k -> new ConcurrentLinkedQueue<>());
ITransport resource = availableTransports.poll();
if (resource == null) {
resource = supplier.get();
}
ConcurrentLinkedQueue<ITransport> usedTransports = usedResources.computeIfAbsent(alias, k -> new ConcurrentLinkedQueue<>());
usedTransports.add(resource);
return resource;
}
/**
* Dumps given number of last events.
*
* @param n Number of last elements to dump.
*/
public static void dumpLastAndStop(int n) {
ConcurrentLinkedQueue<Item> q = que.getAndSet(null);
if (q == null)
return;
int size = q.size();
while (size-- > n)
q.poll();
dump(q);
}
@Test
public void testSinglePatternAfterMigration() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
private static final long serialVersionUID = -4873366487571254798L;
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
};
final Event startEvent1 = new Event(42, "start", 1.0);
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
new KeyedOneInputStreamOperatorTestHarness<>(
getKeyedCepOpearator(false, new SinglePatternNFAFactory()),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
try {
harness.setup();
harness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"));
harness.open();
harness.processElement(new StreamRecord<>(startEvent1, 5));
harness.processWatermark(new Watermark(20));
ConcurrentLinkedQueue<Object> result = harness.getOutput();
// watermark and the result
assertEquals(2, result.size());
Object resultObject = result.poll();
assertTrue(resultObject instanceof StreamRecord);
StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
assertTrue(resultRecord.getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<String, List<Event>> patternMap =
(Map<String, List<Event>>) resultRecord.getValue();
assertEquals(startEvent1, patternMap.get("start").get(0));
} finally {
harness.close();
}
}
@SuppressWarnings("unchecked")
private void setupInputChannels() throws IOException, InterruptedException {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(),
inputChannels[channelIndex]);
}
}
/**
* Tests that the AysncWaitOperator can restart if checkpointed queue was full.
*
* <p>See FLINK-7949
*/
@Test(timeout = 10000)
public void testRestartWithFullQueue() throws Exception {
int capacity = 10;
// 1. create the snapshot which contains capacity + 1 elements
final CompletableFuture<Void> trigger = new CompletableFuture<>();
final ControllableAsyncFunction<Integer> controllableAsyncFunction = new ControllableAsyncFunction<>(trigger);
final OneInputStreamOperatorTestHarness<Integer, Integer> snapshotHarness = new OneInputStreamOperatorTestHarness<>(
new AsyncWaitOperator<>(
controllableAsyncFunction, // the NoOpAsyncFunction is like a blocking function
1000L,
capacity,
AsyncDataStream.OutputMode.ORDERED),
IntSerializer.INSTANCE);
snapshotHarness.open();
final OperatorSubtaskState snapshot;
final ArrayList<Integer> expectedOutput = new ArrayList<>(capacity + 1);
try {
synchronized (snapshotHarness.getCheckpointLock()) {
for (int i = 0; i < capacity; i++) {
snapshotHarness.processElement(i, 0L);
expectedOutput.add(i);
}
}
expectedOutput.add(capacity);
final OneShotLatch lastElement = new OneShotLatch();
final CheckedThread lastElementWriter = new CheckedThread() {
@Override
public void go() throws Exception {
synchronized (snapshotHarness.getCheckpointLock()) {
lastElement.trigger();
snapshotHarness.processElement(capacity, 0L);
}
}
};
lastElementWriter.start();
lastElement.await();
synchronized (snapshotHarness.getCheckpointLock()) {
// execute the snapshot within the checkpoint lock, because then it is guaranteed
// that the lastElementWriter has written the exceeding element
snapshot = snapshotHarness.snapshot(0L, 0L);
}
// trigger the computation to make the close call finish
trigger.complete(null);
} finally {
synchronized (snapshotHarness.getCheckpointLock()) {
snapshotHarness.close();
}
}
// 2. restore the snapshot and check that we complete
final OneInputStreamOperatorTestHarness<Integer, Integer> recoverHarness = new OneInputStreamOperatorTestHarness<>(
new AsyncWaitOperator<>(
new ControllableAsyncFunction<>(CompletableFuture.completedFuture(null)),
1000L,
capacity,
AsyncDataStream.OutputMode.ORDERED),
IntSerializer.INSTANCE);
recoverHarness.initializeState(snapshot);
synchronized (recoverHarness.getCheckpointLock()) {
recoverHarness.open();
}
synchronized (recoverHarness.getCheckpointLock()) {
recoverHarness.close();
}
final ConcurrentLinkedQueue<Object> output = recoverHarness.getOutput();
assertThat(output.size(), Matchers.equalTo(capacity + 1));
final ArrayList<Integer> outputElements = new ArrayList<>(capacity + 1);
for (int i = 0; i < capacity + 1; i++) {
StreamRecord<Integer> streamRecord = ((StreamRecord<Integer>) output.poll());
outputElements.add(streamRecord.getValue());
}
assertThat(outputElements, Matchers.equalTo(expectedOutput));
}
@SuppressWarnings("unchecked")
private void setupInputChannels() throws IOException, InterruptedException {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
if (event instanceof EndOfPartitionEvent) {
inputChannels[channelIndex].setReleased();
}
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(),
inputChannels[channelIndex]);
}
}
@Test
public void testSinglePatternAfterMigration() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
private static final long serialVersionUID = -4873366487571254798L;
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
};
final Event startEvent1 = new Event(42, "start", 1.0);
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
new KeyedOneInputStreamOperatorTestHarness<>(
getKeyedCepOpearator(false, new SinglePatternNFAFactory()),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
try {
harness.setup();
harness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"));
harness.open();
harness.processElement(new StreamRecord<>(startEvent1, 5));
harness.processWatermark(new Watermark(20));
ConcurrentLinkedQueue<Object> result = harness.getOutput();
// watermark and the result
assertEquals(2, result.size());
Object resultObject = result.poll();
assertTrue(resultObject instanceof StreamRecord);
StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
assertTrue(resultRecord.getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<String, List<Event>> patternMap =
(Map<String, List<Event>>) resultRecord.getValue();
assertEquals(startEvent1, patternMap.get("start").get(0));
} finally {
harness.close();
}
}
public static void testCacheMultiThreaded(final BlockCache toBeTested,
final int blockSize, final int numThreads, final int numQueries,
final double passingScore) throws Exception {
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
conf);
final AtomicInteger totalQueries = new AtomicInteger();
final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>();
final AtomicInteger hits = new AtomicInteger();
final AtomicInteger miss = new AtomicInteger();
HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
blocksToTest.addAll(Arrays.asList(blocks));
for (int i = 0; i < numThreads; i++) {
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
if (!blocksToTest.isEmpty()) {
HFileBlockPair ourBlock = blocksToTest.poll();
// if we run out of blocks to test, then we should stop the tests.
if (ourBlock == null) {
ctx.setStopFlag(true);
return;
}
toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
false, false, true);
if (retrievedBlock != null) {
assertEquals(ourBlock.block, retrievedBlock);
toBeTested.evictBlock(ourBlock.blockName);
hits.incrementAndGet();
assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
} else {
miss.incrementAndGet();
}
totalQueries.incrementAndGet();
}
}
};
t.setDaemon(true);
ctx.addThread(t);
}
ctx.startThreads();
while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
+ miss.get());
}
}
@Override
protected ByteBuffer queuePoll() {
ConcurrentLinkedQueue<ByteBuffer> queue = this.queueArray[ getNextIndex(pollIdx) ];
return queue.poll();
}
@SuppressWarnings("unchecked")
private void setupInputChannels() {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(bufferConsumer.build(), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
if (event instanceof EndOfPartitionEvent) {
inputChannels[channelIndex].setReleased();
}
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
}
inputGate.setInputChannels(inputChannels);
}
public static void updateQuickDocAsync(@Nonnull PsiElement element, @Nonnull CharSequence prefix, @Nonnull Consumer<Consumer<Object>> provider) {
Project project = element.getProject();
StringBuilder sb = new StringBuilder(prefix);
ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
Disposable alarmDisposable = Disposable.newDisposable();
Disposer.register(project, alarmDisposable);
AtomicBoolean stop = new AtomicBoolean(false);
Ref<Object> cutAt = Ref.create(null);
SingleAlarm alarm = new SingleAlarm(() -> {
DocumentationComponent component = getActiveDocComponent(project);
if (component == null) {
stop.set(true);
Disposer.dispose(alarmDisposable);
return;
}
Object s = queue.poll();
while (s != null) {
if (s == CUT_AT_CMD || cutAt.get() == CUT_AT_CMD) {
cutAt.set(s);
s = "";
}
else if (!cutAt.isNull()) {
int idx = StringUtil.indexOf(sb, cutAt.get().toString());
if (idx >= 0) sb.setLength(idx);
cutAt.set(null);
}
sb.append(s);
s = queue.poll();
}
if (stop.get()) {
Disposer.dispose(alarmDisposable);
}
String newText = sb.toString() + "<br><br><br>";
String prevText = component.getText();
if (!Comparing.equal(newText, prevText)) {
component.replaceText(newText, element);
}
}, 100, alarmDisposable);
ApplicationManager.getApplication().executeOnPooledThread(() -> {
try {
provider.consume(str -> {
ProgressManager.checkCanceled();
if (stop.get()) throw new ProcessCanceledException();
queue.add(str);
alarm.cancelAndRequest();
});
}
finally {
if (stop.compareAndSet(false, true)) {
alarm.cancelAndRequest();
}
}
});
}
public MessageExt getMessage(String topic) {
MessageExt result = null;
ConcurrentLinkedQueue<MessageExt> messages= map.get(topic);
if (messages!=null && !messages.isEmpty()) {
result = messages.poll();
}
return result;
}