下面列出了java.util.AbstractSet#java.util.concurrent.ConcurrentLinkedQueue 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
Segment(
LocalCache<K, V> map,
int initialCapacity,
long maxSegmentWeight,
StatsCounter statsCounter) {
this.map = map;
this.maxSegmentWeight = maxSegmentWeight;
this.statsCounter = checkNotNull(statsCounter);
initTable(newEntryArray(initialCapacity));
keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<K>() : null;
valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<V>() : null;
recencyQueue =
map.usesAccessQueue()
? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
writeQueue =
map.usesWriteQueue()
? new WriteQueue<K, V>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
accessQueue =
map.usesAccessQueue()
? new AccessQueue<K, V>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
}
@Test
public void chunkOperatorCorrectlyChunksInfiniteRequestFusion() {
int chunkSize = DEFAULT_CHUNK_SIZE;
int partOfChunk = TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE;
int num = chunkSize * 2;
AbstractStreamObserverAndPublisher<Long> source =
new TestStreamObserverAndPublisherWithFusion<Long>(new ConcurrentLinkedQueue<Long>(), null);
AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, num);
source.onSubscribe(observer);
TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
.observeOn(Schedulers.trampoline())
.test();
testSubscriber.awaitTerminalEvent();
testSubscriber.assertComplete();
assertThat(observer.requestsQueue).containsExactly(chunkSize, partOfChunk, partOfChunk, partOfChunk);
assertThat(source.outputFused).isTrue();
}
private void warmUp(String filename, int operations, ConcurrentLinkedQueue<CrailBuffer> bufferList) throws Exception {
Random random = new Random();
String warmupFilename = filename + random.nextInt();
System.out.println("warmUp, warmupFile " + warmupFilename + ", operations " + operations);
if (operations > 0){
CrailFile warmupFile = fs.create(warmupFilename, CrailNodeType.DATAFILE, CrailStorageClass.DEFAULT, CrailLocationClass.DEFAULT).get().asFile();
CrailBufferedOutputStream warmupStream = warmupFile.getBufferedOutputStream(0);
for (int i = 0; i < operations; i++){
CrailBuffer buf = bufferList.poll();
buf.clear();
warmupStream.write(buf.getByteBuffer());
bufferList.add(buf);
}
warmupStream.purge().get();
warmupStream.close();
fs.delete(warmupFilename, false).get().syncDir();
}
}
@Override
public Queue<Selectable> createFlowTriggerEventQueue(DatalakeClusterUpgradeTriggerEvent event) {
Queue<Selectable> chain = new ConcurrentLinkedQueue<>();
Optional<StackStatus> stackStatusOpt = stackStatusService.findFirstByStackIdOrderByCreatedDesc(event.getResourceId());
StackStatus unknownStackStatus = new StackStatus();
unknownStackStatus.setDetailedStackStatus(DetailedStackStatus.UNKNOWN);
DetailedStackStatus detailedStackStatus = stackStatusOpt.orElse(unknownStackStatus).getDetailedStackStatus();
if (DetailedStackStatus.CLUSTER_UPGRADE_FAILED.equals(detailedStackStatus)) {
chain.add(new DatalakeClusterUpgradeTriggerEvent(
CLUSTER_MANAGER_UPGRADE_FINISHED_EVENT.event(), event.getResourceId(), event.accepted(), event.getTargetImage()));
} else {
chain.add(new DatalakeClusterUpgradeTriggerEvent(
CLUSTER_MANAGER_UPGRADE_EVENT.event(), event.getResourceId(), event.accepted(), event.getTargetImage()));
}
return chain;
}
private void constructConnectedComponent(V v, Collection<V> toVisit) {
IFragment<E,V> f = new Fragment<E,V>(this.g);
Set<V> visited = new HashSet<V>();
visited.add(v);
Queue<V> queue = new ConcurrentLinkedQueue<V>();
queue.add(v);
while (!queue.isEmpty()) {
V vv = queue.poll();
f.addAll(this.g.getEdges(vv));
for (V vvv : this.g.getAdjacent(vv)) {
if (!visited.contains(vvv))
queue.add(vvv);
visited.add(vvv);
toVisit.remove(vvv);
}
}
this.components.add(f);
}
private void stopScan() {
for (Map.Entry<String, DeviceConnection> device_entry: this.devices_connections.entrySet())
{
device_entry.getValue().dispose();
}
bluetoothLeScanner.stopScan(this.mLeScanCallback);
this.rxBleClient = RxBleClient.create(getApplicationContext());
this.devicesAdapter = new DeviceAdapter(this, R.layout.list_device_item, new ArrayList<>());
this.lv_scan.setAdapter(this.devicesAdapter);
this.btManager= (BluetoothManager) this.getSystemService(Context.BLUETOOTH_SERVICE);
mBTAdapter = this.btManager.getAdapter();
bluetoothLeScanner = this.mBTAdapter.getBluetoothLeScanner();
this.devices_connections = new HashMap<>();
this.devices_to_attack = new ConcurrentLinkedQueue<>();
this.scanning = false;
this.updateStatus();
this.devicesAdapter.notifyDataSetChanged();
}
static void putAllCollections(Map<Class<?>, IntFunction<?>> map)
{
safePut(map, ArrayList.class, ArrayList::new);
safePut(map, HashSet.class, LinkedHashSet::new);
safePut(map, Properties.class, x -> new Properties());
safePut(map, Hashtable.class, Hashtable::new);
safePut(map, Collection.class, ArrayList::new);
safePut(map, Set.class, LinkedHashSet::new);
safePut(map, List.class, ArrayList::new);
safePut(map, SortedSet.class, x -> new TreeSet<>());
safePut(map, Queue.class, x -> new ConcurrentLinkedQueue<>());
safePut(map, Deque.class, x -> new ConcurrentLinkedDeque<>());
safePut(map, BlockingQueue.class, x -> new LinkedBlockingQueue<>());
safePut(map, BlockingDeque.class, x -> new LinkedBlockingDeque<>());
safePut(map, HashMap.class, LinkedHashMap::new);
safePut(map, LinkedHashMap.class, LinkedHashMap::new);
safePut(map, ConcurrentHashMap.class, ConcurrentHashMap::new);
safePut(map, Map.class, LinkedHashMap::new);
safePut(map, ConcurrentMap.class, x -> new ConcurrentSkipListMap<>());
safePut(map, ConcurrentNavigableMap.class, x -> new ConcurrentSkipListMap<>());
safePut(map, SortedMap.class, i -> new TreeMap<>());
}
public List<PojoClass> getPojoClassesRecursively(final String packageName, final PojoClassFilter pojoClassFilter) {
final List<PojoClass> pojoClasses = new LinkedList<PojoClass>();
final PojoClassFilter finalFilterChain = getFinalFilterChain(pojoClassFilter);
final PojoPackage pojoPackage = PojoPackageFactory.getPojoPackage(packageName);
Queue<PojoPackage> pending = new ConcurrentLinkedQueue<PojoPackage>();
pending.add(pojoPackage);
while (!pending.isEmpty()) {
final PojoPackage entry = pending.remove();
pending.addAll(entry.getPojoSubPackages());
pojoClasses.addAll(entry.getPojoClasses(finalFilterChain));
}
return pojoClasses;
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
@Test
public void givenProducerOffersElementInQueue_WhenConsumerPollsQueue_ThenItRetrievesElement() throws Exception {
int element = 1;
ExecutorService executorService = Executors.newFixedThreadPool(2);
ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
Runnable offerTask = () -> concurrentLinkedQueue.offer(element);
Callable<Integer> pollTask = () -> {
while (concurrentLinkedQueue.peek() != null) {
return concurrentLinkedQueue.poll()
.intValue();
}
return null;
};
executorService.submit(offerTask);
TimeUnit.SECONDS.sleep(1);
Future<Integer> returnedElement = executorService.submit(pollTask);
assertThat(returnedElement.get()
.intValue(), is(equalTo(element)));
executorService.awaitTermination(1, TimeUnit.SECONDS);
executorService.shutdown();
}
@Override
public void prepare(Object configurationObject) {
this.dataQueue = new ConcurrentLinkedQueue<>();
this.isCompleted = new AtomicBoolean(false);
Objects.requireNonNull(config);
Objects.requireNonNull(config.getOauth());
Objects.requireNonNull(config.getOauth().getClientId());
Objects.requireNonNull(config.getOauth().getClientSecret());
Objects.requireNonNull(config.getOauth().getAccessToken());
Objects.requireNonNull(config.getThreadsPerProvider());
try {
client = Instagram.getInstance(this.config);
} catch (InstantiationException ex) {
LOGGER.error("InstantiationException", ex);
}
Objects.requireNonNull(client);
}
private void expectReply(final ConcurrentLinkedQueue<QueueItem> queue, final QueueItem item) {
final long wait_time = 3000;
Inevitable.task("pendiq-expect-reply-" + item.description, wait_time, new Runnable() {
@Override
public void run() {
if (JoH.msSince(lastProcessedIncomingData) > wait_time) {
UserError.Log.d(TAG, "GOT NO REPLY FOR: " + item.description + " @ " + item.retries);
item.retries++;
if (item.retries <= MAX_QUEUE_RETRIES) {
UserError.Log.d(TAG, "Retrying due to no reply: " + item.description);
writeQueueItem(queue, item);
}
}
}
});
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
/**
* Create a new ProxyToServerConnection.
*
* @param proxyServer
* @param clientConnection
* @param serverHostAndPort
* @param initialFilters
* @param initialHttpRequest
* @return
* @throws UnknownHostException
*/
static ProxyToServerConnection create(DefaultHttpProxyServer proxyServer,
ClientToProxyConnection clientConnection,
String serverHostAndPort,
HttpFilters initialFilters,
HttpRequest initialHttpRequest,
GlobalTrafficShapingHandler globalTrafficShapingHandler)
throws UnknownHostException {
Queue<ChainedProxy> chainedProxies = new ConcurrentLinkedQueue<ChainedProxy>();
ChainedProxyManager chainedProxyManager = proxyServer
.getChainProxyManager();
if (chainedProxyManager != null) {
chainedProxyManager.lookupChainedProxies(initialHttpRequest,
chainedProxies);
if (chainedProxies.size() == 0) {
// ChainedProxyManager returned no proxies, can't connect
return null;
}
}
return new ProxyToServerConnection(proxyServer,
clientConnection,
serverHostAndPort,
chainedProxies.poll(),
chainedProxies,
initialFilters,
globalTrafficShapingHandler);
}
private Client get(ByteBuffer pk)
{
Set<Host> hosts = metadata.getReplicas(metadata.quote(keyspace), pk);
InetAddress address = null;
if (hosts.size() > 0)
{
int pos = roundrobin.incrementAndGet() % hosts.size();
for (int i = 0 ; address == null && i < hosts.size() ; i++)
{
if (pos < 0)
pos = -pos;
Host host = Iterators.get(hosts.iterator(), (pos + i) % hosts.size());
if (whiteset == null || whiteset.contains(host.getAddress()))
address = host.getAddress();
}
}
if (address == null)
address = whitelist.get(ThreadLocalRandom.current().nextInt(whitelist.size()));
ConcurrentLinkedQueue<Client> q = cache.get(address);
if (q == null)
{
ConcurrentLinkedQueue<Client> newQ = new ConcurrentLinkedQueue<Client>();
q = cache.putIfAbsent(address, newQ);
if (q == null)
q = newQ;
}
Client tclient = q.poll();
if (tclient != null)
return tclient;
return new Client(settings.getRawThriftClient(address.getHostAddress()), address);
}
private FastaData(ConcurrentLinkedQueue<Sequence> seqList)
{
this.sequenceList = new ConcurrentLinkedQueue<Sequence>(seqList);
this.fileReader = null;
this.lastLine = null;
this.readFullFile = true;
this.numberProcessed = new AtomicLong(this.sequenceList.size());
this.offset = 0;
}
/**
* Creates a new AdbStream object on the specified AdbConnection
* with the given local ID.
*
* @param adbConn AdbConnection that this stream is running on
* @param localId Local ID of the stream
*/
public AdbStream(AdbConnection adbConn, int localId) {
this.adbConn = adbConn;
this.localId = localId;
this.readQueue = new ConcurrentLinkedQueue<byte[]>();
this.writeReady = new AtomicBoolean(false);
this.isClosed = false;
}
/**
* Reset queue to empty one.
*/
public static void reset() {
ConcurrentLinkedQueue<Item> old = que.get();
if (old != null) // Was not stopped.
que.compareAndSet(old, new ConcurrentLinkedQueue<Item>());
}
@Test
public void requestRespectsDisable() throws Exception {
ConcurrentLinkedQueue<AssertionError> errors = new ConcurrentLinkedQueue<>();
StreamingHttpRequest req = client.get("/").transformPayloadBody(p -> p.beforeRequest(__ -> {
if (isInvalidThread()) {
errors.add(new AssertionError("Invalid thread called request-n. Thread: "
+ currentThread()));
}
}));
client.request(overridingStrategy, req)
.beforeOnSuccess(__ -> {
if (isInvalidThread()) {
errors.add(new AssertionError("Invalid thread called response metadata. " +
"Thread: " + currentThread()));
}
})
.flatMapPublisher(StreamingHttpResponse::payloadBody)
.beforeOnNext(__ -> {
if (isInvalidThread()) {
errors.add(new AssertionError("Invalid thread called response payload onNext. " +
"Thread: " + currentThread()));
}
})
.beforeOnComplete(() -> {
if (isInvalidThread()) {
errors.add(new AssertionError("Invalid thread called response payload onComplete. " +
"Thread: " + currentThread()));
}
}).toFuture().get();
assertThat("Unexpected errors: " + errors, errors, hasSize(0));
}
@Test
public void constructors() {
ConstructorTestBuilder ctb = new ConstructorTestBuilder(PublisherWindowStartEnd.class);
ctb.addRef("source", PublisherNever.instance());
ctb.addRef("start", PublisherNever.instance());
ctb.addRef("end", (Function<Object, Publisher<Object>>)o -> PublisherNever.instance());
ctb.addRef("drainQueueSupplier", (Supplier<Queue<Object>>)() -> new ConcurrentLinkedQueue<>());
ctb.addRef("processorQueueSupplier", (Supplier<Queue<Object>>)() -> new ConcurrentLinkedQueue<>());
ctb.test();
}
private void commitMessage(ConcurrentLinkedQueue<T> messages) {
LinkedList<T> list = new LinkedList<T>();
list.addAll(messages);
cache.clear();
if (list != null && list.size() > 0) {
parallelDispatch(list);
list.clear();
}
}
@Override
public Queue<Selectable> createFlowTriggerEventQueue(StackEvent event) {
Queue<Selectable> flowEventChain = new ConcurrentLinkedQueue<>();
flowEventChain.add(new StackEvent(START_EXTERNAL_DATABASE_CREATION_EVENT.event(), event.getResourceId(), event.accepted()));
flowEventChain.add(new StackEvent(START_CREATION_EVENT.event(), event.getResourceId(), event.accepted()));
flowEventChain.add(new StackEvent(CLUSTER_CREATION_EVENT.event(), event.getResourceId()));
return flowEventChain;
}
@Override
public LogPosition getLatestLogPosWithRm() {
ConcurrentLinkedQueue<LogPosition> logPositions = this.logPositions;
if (logPositions == null) {
return null;
}
// 轻易不要开work日志
debugLogPosition(logPositions);
LogPosition curr;
LogPosition pre = null;
int len = 0;
List<LogPosition> rms = new LinkedList<>();
// 避免进入无线循环当中 所以控制次数 不需要担心队列过长 因为有truncate Log position 保证
Iterator<LogPosition> iter = logPositions.iterator();
while (iter.hasNext() && (curr = iter.next()) != null
&& len++ < THROTTLE_QUEUE_SIZE) {
if (curr.isCommit()) {
rms.add(curr); // 添加到 删除队列
pre = curr;
continue;
}
break; // 如果不是commit 直接退出
}
removeQueueWithLock(logPositions, rms);
if (pre != null) {
pre.mergeOriginLogPos(originPos);
pre.refresh();
return pre;
}
return null;
}
@CalledByAny
public FragmentQueueProcessor(
ConcurrentLinkedQueue<Fragment> availableQueue,
ConcurrentLinkedQueue<Fragment> loadingQueue,
ConcurrentLinkedQueue<Fragment> recycleQueue,
FragmentCache cache,
LayerManager layerManager,
Setting<Dimension> dimensionSetting) {
this.availableQueue = availableQueue;
this.loadingQueue = loadingQueue;
this.recycleQueue = recycleQueue;
this.cache = cache;
this.layerManager = layerManager;
this.dimensionSetting = dimensionSetting;
}
@Override
public void emit(K key, V value) {
if (this.doReduce)
this.reduceMap.computeIfAbsent(key, k -> new ConcurrentLinkedQueue<>()).add(value);
else
this.mapQueue.add(new KeyValue<>(key, value));
}
@CalledByAny
public FragmentQueueProcessor(
ConcurrentLinkedQueue<Fragment> availableQueue,
ConcurrentLinkedQueue<Fragment> loadingQueue,
ConcurrentLinkedQueue<Fragment> recycleQueue,
FragmentCache cache,
LayerManager layerManager,
Setting<Dimension> dimensionSetting) {
this.availableQueue = availableQueue;
this.loadingQueue = loadingQueue;
this.recycleQueue = recycleQueue;
this.cache = cache;
this.layerManager = layerManager;
this.dimensionSetting = dimensionSetting;
}
@Test
public void testSideOutput() throws Exception {
try (
TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = getInitializedTestHarness(
new FunctionWithSideOutput(), STATE_DESCRIPTOR)
) {
testHarness.processWatermark1(new Watermark(10L));
testHarness.processWatermark2(new Watermark(10L));
testHarness.processElement2(new StreamRecord<>(5, 12L));
testHarness.processWatermark1(new Watermark(40L));
testHarness.processWatermark2(new Watermark(40L));
testHarness.processElement1(new StreamRecord<>("6", 13L));
testHarness.processElement1(new StreamRecord<>("6", 15L));
testHarness.processWatermark1(new Watermark(50L));
testHarness.processWatermark2(new Watermark(50L));
ConcurrentLinkedQueue<StreamRecord<String>> expectedBr = new ConcurrentLinkedQueue<>();
expectedBr.add(new StreamRecord<>("BR:5 WM:10 TS:12", 12L));
ConcurrentLinkedQueue<StreamRecord<String>> expectedNonBr = new ConcurrentLinkedQueue<>();
expectedNonBr.add(new StreamRecord<>("NON-BR:6 WM:40 TS:13", 13L));
expectedNonBr.add(new StreamRecord<>("NON-BR:6 WM:40 TS:15", 15L));
ConcurrentLinkedQueue<StreamRecord<String>> brSideOutput = testHarness.getSideOutput(FunctionWithSideOutput.BROADCAST_TAG);
ConcurrentLinkedQueue<StreamRecord<String>> nonBrSideOutput = testHarness.getSideOutput(FunctionWithSideOutput.NON_BROADCAST_TAG);
TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedBr, brSideOutput);
TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedNonBr, nonBrSideOutput);
}
}
protected ConcurrentLinkedQueue<DbQueueModel> getQueue(Table<?> table) {
synchronized (syncLock) {
ConcurrentLinkedQueue<DbQueueModel> queue = TABLE_QUEUE.get(table.tableName());
if (queue == null) {
queue = new ConcurrentLinkedQueue<>();
TABLE_QUEUE.putIfAbsent(table.tableName(), queue);
TABLE_INFO.putIfAbsent(table.tableName(), table.getTableInfo());
}
return queue;
}
}
/**
* Verifies that we don't have leakage between different keys.
*/
@Test
public void testProcessingTimeTimerWithState() throws Exception {
LegacyKeyedCoProcessOperator<String, Integer, String, String> operator =
new LegacyKeyedCoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
operator,
new IntToStringKeySelector<>(),
new IdentityKeySelector<String>(),
BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.setProcessingTime(1);
testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
testHarness.processElement1(new StreamRecord<>(13)); // should set timer for 6
testHarness.setProcessingTime(2);
testHarness.processElement1(new StreamRecord<>(13)); // should delete timer again
testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
testHarness.setProcessingTime(6);
testHarness.setProcessingTime(7);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>("INPUT1:17"));
expectedOutput.add(new StreamRecord<>("INPUT1:13"));
expectedOutput.add(new StreamRecord<>("INPUT2:42"));
expectedOutput.add(new StreamRecord<>("STATE:17"));
expectedOutput.add(new StreamRecord<>("STATE:42"));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}
@Test
public void testLateElements() throws Exception {
testHarness.processWatermark(1L);
assertEquals(1L, operator.lastWatermark);
testHarness.processElement(record(K1, 0L));
testHarness.processElement(record(K1, 1L));
testHarness.processWatermark(2L);
assertEquals(2L, operator.lastWatermark);
Queue<Object> actual = testHarness.getOutput();
Queue<Object> expected = new ConcurrentLinkedQueue<>();
expected.add(watermark(1L));
expected.add(watermark(2L));
TestHarnessUtil.assertOutputEquals("Unexpected output", expected, actual);
}