下面列出了怎么用java.util.concurrent.PriorityBlockingQueue的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void addRecords(HostAndPort hostAndPort, List<StoreRecord> records) {
store.compute(hostAndPort, (key, value) -> {
Map<String, Queue<StoreRecord>> data = Optional.ofNullable(value)
.orElse(new ConcurrentHashMap<>());
for (StoreRecord record : records) {
data.compute(record.getSchemeName(), (dKey, dValue) -> {
Queue<StoreRecord> queue = Optional.ofNullable(dValue)
.orElse(new PriorityBlockingQueue<>(16,
Comparator.comparingLong(o -> o.getTimestamp())));
queue.add(record);
return queue;
});
}
return data;
});
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
PriorityBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* Class Constructor.
* @param partitionId ID of the partition.
* @param storePartition {@link StorePartition} associated with the given partition ID.
* @param feedCachePartition {@link FeedCachePartition} associated with the given partition ID.
* @param transactionFetcher {@link TransactionFetcher} associated with the {@code WaltzServer} to which the partition is part of.
* @param config the config of the {@code WaltzServer} to which the partition is part of.
*/
public Partition(int partitionId, StorePartition storePartition, FeedCachePartition feedCachePartition, TransactionFetcher transactionFetcher, WaltzServerConfig config) {
this.partitionId = partitionId;
this.lockTableSize = (int) config.get(WaltzServerConfig.OPTIMISTIC_LOCK_TABLE_SIZE);
this.minFetchSize = (int) config.get(WaltzServerConfig.MIN_FETCH_SIZE);
this.realtimeThreshold = (int) config.get(WaltzServerConfig.REALTIME_THRESHOLD);
this.storePartition = storePartition;
this.appendTask = new AppendTask();
this.nearRealtimeFeedTask = new FeedTask("R", new PriorityBlockingQueue<>(100, FeedContext.HIGH_WATER_MARK_COMPARATOR));
this.catchupFeedTask = new FeedTask("C", new LinkedBlockingQueue<>());
this.pausedFeedContexts = new LinkedList<>();
this.metricsGroup = String.format("%s.partition-%d", MetricGroup.WALTZ_SERVER_METRIC_GROUP, partitionId);
// Register metrics
registerMetrics();
this.feedCachePartition = feedCachePartition;
this.transactionFetcher = transactionFetcher;
}
@SuppressWarnings("unchecked")
protected static <T> Queue<T> createSimilarQueue(Queue<T> orig) {
if (orig instanceof ArrayBlockingQueue) {
ArrayBlockingQueue queue = (ArrayBlockingQueue) orig;
return new ArrayBlockingQueue<T>(queue.size() + queue.remainingCapacity());
} else if (orig instanceof ArrayDeque) {
return new ArrayDeque<T>();
} else if (orig instanceof ConcurrentLinkedQueue) {
return new ConcurrentLinkedQueue<T>();
} else if (orig instanceof DelayQueue) {
return new DelayQueue();
} else if (orig instanceof LinkedBlockingDeque) {
return new LinkedBlockingDeque<T>();
} else if (orig instanceof LinkedBlockingQueue) {
return new LinkedBlockingQueue<T>();
} else if (orig instanceof PriorityBlockingQueue) {
return new PriorityBlockingQueue<T>();
} else if (orig instanceof PriorityQueue) {
return new PriorityQueue<T>(11, ((PriorityQueue) orig).comparator());
} else if (orig instanceof SynchronousQueue) {
return new SynchronousQueue<T>();
} else {
return new LinkedList<T>();
}
}
/**
* Construct a new limiter, providing the implementation of how tasks should be sorted relative
* to each other. The {@link Comparator} provided must deterministically provide ordering such
* that two tasks must always have the same relative order.
* <p>
* This constructor allows you to specify if listeners /
* {@link org.threadly.concurrent.future.FutureCallback}'s / functions in
* {@link ListenableFuture#map(java.util.function.Function)} or
* {@link ListenableFuture#flatMap(java.util.function.Function)} should be counted towards the
* concurrency limit. Specifying {@code false} will release the limit as soon as the original
* task completes. Specifying {@code true} will continue to enforce the limit until all listeners
* (without an executor) complete.
*
* @param executor {@link Executor} to submit task executions to.
* @param maxConcurrency maximum quantity of tasks to run in parallel
* @param limitFutureListenersExecution {@code true} to include listener / mapped functions towards execution limit
* @param sorter Implementation of {@link Comparator} to sort the task queue being limited
*/
@SuppressWarnings("unchecked")
public OrderedExecutorLimiter(Executor executor, int maxConcurrency,
boolean limitFutureListenersExecution,
final Comparator<? super T> sorter) {
ArgumentVerifier.assertNotNull(sorter, "sorter");
limiter = new ExecutorLimiter(executor, maxConcurrency, limitFutureListenersExecution,
new PriorityBlockingQueue<>(INITIAL_QUEUE_SIZE, (rc1, rc2) -> {
T r1 = runnableTypeFromContainer(rc1);
T r2 = runnableTypeFromContainer(rc2);
return sorter.compare(r1, r2);
})) {
@Override
protected boolean taskCapacity() {
checkTaskCapacity();
return super.taskCapacity();
}
};
}
public BlockingQueue<Runnable> create(final Options options, final String prefix, final int queueSize) {
switch (this) {
case ARRAY: {
return new ArrayBlockingQueue<>(queueSize > 0 ? queueSize : 1);
}
case LINKED: {
return new LinkedBlockingQueue<>(queueSize > 0 ? queueSize : 1);
}
case PRIORITY: {
return new PriorityBlockingQueue<>();
}
case SYNCHRONOUS: {
return new SynchronousQueue<>(options.get(prefix + ".QueueFair", false));
}
default: {
// The Options class will throw an error if the user supplies an unknown enum string
// The only way we can reach this is if we add a new QueueType element and forget to
// implement it in the above switch statement.
throw new IllegalArgumentException("Unknown QueueType type: " + this);
}
}
}
@Override
public boolean removePendingMessage(String messageUUID)
{
for (PriorityBlockingQueue<OutboundMessage> q : queueMap.values())
{
for (OutboundMessage m : q)
{
if (m.getId().equalsIgnoreCase(messageUUID))
{
if (q.remove(m))
{
deletePendingMessage(m.getGatewayId(), messageUUID);
return true;
}
}
}
}
return false;
}
/**
* timed poll transfers elements across Executor tasks
*/
public void testPollInExecutor() {
final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
final CheckedBarrier threadsStarted = new CheckedBarrier(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(executor)) {
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertNull(q.poll());
threadsStarted.await();
assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
checkEmpty(q);
}});
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
q.put(one);
}});
}
}
@Test
public void testCacheJwt() throws NoSuchFieldException, IllegalAccessException {
Assert.assertNotNull(cacheStrategy.getCachedJwt(new Jwt.Key(initJwts.get(0).getScopes())));
Field field = LongestExpireCacheStrategy.class.getDeclaredField("expiryQueue");
field.setAccessible(true);
PriorityBlockingQueue cachedQueue = (PriorityBlockingQueue) field.get(cacheStrategy);
Field field1 = LongestExpireCacheStrategy.class.getDeclaredField("cachedJwts");
field1.setAccessible(true);
ConcurrentHashMap<Jwt.Key, Jwt> cachedJwts = (ConcurrentHashMap) field1.get(cacheStrategy);
Assert.assertEquals(cachedJwts.size(), 4);
Assert.assertEquals(cachedQueue.size(), 4);
ArrayList<Jwt> jwts = createJwts(2, initExpiryTime + 10);
Jwt jwt5 = jwts.get(0);
Jwt jwt1 = cachedJwts.get(cachedQueue.peek());
long originalExpiry = jwt1.getExpire();
Assert.assertEquals(cachedJwts.get(new Jwt.Key(jwt1.getScopes())), jwt1);
cacheStrategy.cacheJwt(new Jwt.Key(jwt5.getScopes()), jwt5);
Assert.assertEquals(cachedJwts.get(new Jwt.Key(jwt5.getScopes())), jwt5);
Assert.assertNotEquals(cachedJwts.get(new Jwt.Key(jwt5.getScopes())).getExpire(), originalExpiry);
}
/**
* drainTo empties queue
*/
public void testDrainToWithActivePut() throws InterruptedException {
final PriorityBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
@Test
public void whenPollingEmptyQueue_thenShouldBlockThread() throws InterruptedException {
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
final Thread thread = new Thread(() -> {
LOG.debug("Polling...");
while (true) {
try {
Integer poll = queue.take();
LOG.debug("Polled: " + poll);
} catch (InterruptedException ignored) {
}
}
});
thread.start();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
LOG.debug("Adding to queue");
queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
}
/**
* Start the log thread.
*/
@Override
protected void init() {
this.printers = new ArrayList<>();
this.queue = new PriorityBlockingQueue<>(
SystemProperties.getInteger(SystemProperties.Log.QUEUE_INITIAL_SIZE),
(o1, o2) -> (int)(o1.getDate().getTime() - o2.getDate().getTime()));
this.logMonitor = new Object();
this.shuttingDown = false;
for (int i = 0; i < SystemProperties.getInteger(SystemProperties.Log.LOG_CONSUMERS_SIZE); i++) {
fork(new LogRunnable());
}
List<String> logConsumers = SystemProperties.getList(SystemProperties.Log.CONSUMERS);
logConsumers.forEach(S -> {
try {
LogPrinter printer = (LogPrinter) Class.forName(S).getConstructor().newInstance();
registerConsumer(printer);
} catch (Exception ex){
ex.printStackTrace();
}
});
}
public AStarPathSearch(AStarPathSearchProvider provider, BlockLocation start, BlockLocation end) {
this.provider = provider;
this.start = start;
this.end = end;
heuristic = provider.getHeuristic();
physics = provider.getWorldPhysics();
nodeWorld = new HashMap<BlockLocation, PathNode>();
first = new BlockPathNode(this, start);
first.setCost(0);
first.setCostEstimate(heuristic.calculateCost(start, end));
openSet = new PriorityBlockingQueue<>(64, PATH_NODE_COMPARATOR);
closedSet = new PriorityBlockingQueue<>(64, PATH_NODE_COMPARATOR);
nodeWorld.put(start, first);
openSet.offer(first);
nodeWorldReverse = new HashMap<BlockLocation, PathNode>();
last = new BlockPathNode(this, end);
last.setCost(0);
last.setCostEstimate(heuristic.calculateCost(end, start));
openSetReverse = new PriorityBlockingQueue<>(64, PATH_NODE_COMPARATOR);
closedSetReverse = new PriorityBlockingQueue<>(64, PATH_NODE_COMPARATOR);
nodeWorldReverse.put(end, last);
openSetReverse.offer(last);
}
public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity,
Comparator<? super T> comparator) {
super(initCapacity, comparator);
this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) {
private static final long serialVersionUID = -6805567216580184701L;
@Override
public boolean offer(T t) {
lock.lock();
try {
notEmpty.signal();
return super.offer(t);
} finally {
lock.unlock();
}
}
};
}
public NonTransactionalAdapterQueue(Collection<String> adapterIds, AdapterCallPersisterFactory persistence, int transientQueueLength, int triggerReloadQueueLength, TransactionController ctrl, Batcher batcher) {
this.transientQueueLength = transientQueueLength;
this.triggerReloadQueueLength = triggerReloadQueueLength;
this.persistence = persistence;
this.adapterIds = new ArrayList<String>(adapterIds);
this.ctrl = ctrl;
this.batcher = batcher;
this.queue = new PriorityBlockingQueue<AdapterCall>(transientQueueLength, new Comparator<AdapterCall>() {
@Override
public int compare(AdapterCall o1, AdapterCall o2) {
if (o1.getPriority() < o2.getPriority())
return -1;
if (o1.getPriority() > o2.getPriority())
return 1;
return 0;
}
});
this.reloadThread = new ReloadThread();
}
/**
* Creates a {@code PriorityBlockingQueue} containing the given elements.
*
* <b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
* this priority queue will be ordered according to the same ordering.
*
* @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
*/
public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue(Iterable<? extends E> elements) {
if (elements instanceof Collection) {
return new PriorityBlockingQueue<E>(Collections2.cast(elements));
}
PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
Iterables.addAll(queue, elements);
return queue;
}
public WorldGen(World w)
{
world = w;
islandCache = Collections.synchronizedMap(new ConcurrentHashMap<Integer, CachedIsland>());
mapQueue = new PriorityBlockingQueue<Integer>();
buildThreads = new ThreadBuild[TFCOptions.maxThreadsForIslandGen];
EMPTY_MAP = new IslandMap(ISLAND_SIZE, 0);
IslandParameters ip = createParams(0, -2, 0);
ip.setIslandTemp(ClimateTemp.TEMPERATE);
ip.setIslandMoisture(Moisture.HIGH);
EMPTY_MAP.newIsland(ip);
EMPTY_MAP.generateFake();
}
@SuppressWarnings("unchecked")
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
timer = new AtomicLong( in.readLong() );
PriorityBlockingQueue<DefaultTimerJobInstance> tmp = (PriorityBlockingQueue<DefaultTimerJobInstance>) in.readObject();
if ( tmp != null ) {
queue = tmp;
}
session = ((DroolsObjectInputStream) in).getWorkingMemory();
}
public ChainChannelHandler(ChannelHandlerChain chain, ThreadPoolExecutor executor) {
this.chain = chain;
this.executor = executor;
if (executor != null) {
BlockingQueue queue = executor.getQueue();
if (queue instanceof PriorityBlockingQueue && ((PriorityBlockingQueue) queue).comparator() == null) {
runFunc = ComparableRunnable::new;
} else {
runFunc = (r) -> r;
}
}
}
BoundCommandPool(final int poolSize, Tracer tracer) {
this.executorService = new ContextMigratingExecutorService<>(new ThreadPoolExecutor(
poolSize, poolSize, // limited pool of threads
0, TimeUnit.SECONDS, // doesn't matter as number of threads never exceeds core size
new PriorityBlockingQueue<>(),
new NamedThreadFactory("bound-command")
), tracer);
}
public MockQueue(String name, AmqArguments arguments, ReceiverRegistry receiverRegistry, MockChannel mockChannel) {
this.name = name;
this.pointer = new ReceiverPointer(ReceiverPointer.Type.QUEUE, name);
this.arguments = arguments;
this.receiverRegistry = receiverRegistry;
this.mockChannel = mockChannel;
messages = new PriorityBlockingQueue<>(11, new MessageComparator(arguments));
executorService = new RestartableExecutorService(() -> Executors.newFixedThreadPool(1, new NamedThreadFactory(i -> name + "_queue_consuming")));
start();
}
public PriorityTaskDispatch() {
PriorityBlockingQueue<Runnable> priorityBlockingQueue =
new PriorityBlockingQueue<>
(60, new PriorityTaskComparator<Runnable>());
mExecutorService = new ThreadPoolExecutor(
1,
1,
20,
TimeUnit.SECONDS,
priorityBlockingQueue,
threadFactory("Priority Dispatcher", false));
}
private static void init(){
threadCount = Runtime.getRuntime().availableProcessors() + 1;
executorService = Executors.newFixedThreadPool(threadCount);
channelMap = new ConcurrentHashMap<>(Broadcast.CHANNEL_INITIAL_CAPACITY);
channel = new DefaultChannel<PriorityBlockingQueue<WeakReference<ChannelPost>>,
ConcurrentHashMap<Integer,WeakReference<Object>>>(
Channel.DEFAULT_CHANNEL_ID,
ChannelState.OPEN,
ChannelType.DEFAULT,
new PriorityBlockingQueue<>(Channel.MSG_QUEUE_INITIAL_CAPACITY,
new Comparator<WeakReference<ChannelPost>>() {
@Override
public int compare(WeakReference<ChannelPost> o1, WeakReference<ChannelPost> o2) {
ChannelPost post1 = o1.get();
ChannelPost post2 = o2.get();
if(post1 != null || post2 != null
|| post1.getPriority() != null
|| post2.getPriority() != null){
return post1.getPriority().compareTo(post2.getPriority());
}else{
return 0;
}
}
}),
new ConcurrentHashMap<Integer,WeakReference<Object>>(Channel.SUBSCRIBER_INITIAL_CAPACITY));
channelMap.put(Channel.DEFAULT_CHANNEL_ID, channel);
broadcastCenter = new BroadcastCenter(channelMap, executorService);
}
BlockMover(int numMovingThreads, int maxQueueSize,
boolean simulate, int alwaysSubmitPriorityLevel, Configuration conf) throws IOException {
this.movingQueue = new PriorityBlockingQueue<Runnable>(
1000, new BlockMoveActionComparator());
ThreadFactory factory = new ThreadFactory() {
final AtomicInteger numThreads = new AtomicInteger();
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("BLockMoveExecutor-" + numThreads.getAndIncrement());
return t;
}
};
this.executor = new ThreadPoolExecutor(numMovingThreads,
numMovingThreads, 0L, TimeUnit.MILLISECONDS, movingQueue, factory);
this.maxQueueSize = maxQueueSize;
this.metrics = RaidNodeMetrics.getInstance(RaidNodeMetrics.DEFAULT_NAMESPACE_ID);
this.cluster = new ClusterInfo();
this.clusterUpdater = new Thread(cluster);
this.simulate = simulate;
this.rand = new Random();
this.conf = conf;
this.alwaysSubmitPriorityLevel = alwaysSubmitPriorityLevel;
this.dataTransferProtocolVersion = RaidUtils.getDataTransferProtocolVersion(conf);
}
private void fetchListing(final ProcessContext context, final ProcessSession session, final FileTransfer transfer) throws IOException {
BlockingQueue<FileInfo> queue = fileQueueRef.get();
if (queue == null) {
final boolean useNaturalOrdering = context.getProperty(FileTransfer.USE_NATURAL_ORDERING).asBoolean();
queue = useNaturalOrdering ? new PriorityBlockingQueue<FileInfo>(25000) : new LinkedBlockingQueue<FileInfo>(25000);
fileQueueRef.set(queue);
}
final StopWatch stopWatch = new StopWatch(true);
final List<FileInfo> listing = transfer.getListing();
final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
int newItems = 0;
mutuallyExclusiveTransferLock.lock();
try {
for (final FileInfo file : listing) {
if (!queue.contains(file) && !processing.contains(file)) {
if (!queue.offer(file)) {
break;
}
newItems++;
}
}
} finally {
mutuallyExclusiveTransferLock.unlock();
}
getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
new Object[]{millis, listing.size(), newItems});
}
/**
*
* @return Queue for evict partitions.
*/
private Queue<PartitionEvictionTask> createEvictPartitionQueue() {
switch (QUEUE_TYPE) {
case 1:
return new PriorityBlockingQueue<>(
1000, Comparator.comparingLong(p -> p.part.fullSize()));
default:
return new LinkedBlockingQueue<>();
}
}
@Test
public void queueDeepTest() {
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(10);
for (int i = 0; i < 100; i++) {
queue.put(i);
}
System.out.println(queue.size());
}
private PBQueueSpliterator(PriorityBlockingQueue<E> queue,
Object[] array, int index, int fence) {
this.queue = queue;
this.array = array;
this.index = index;
this.fence = fence;
}
@Override
public Collection<OutboundMessage> getPendingMessages(String gatewayId)
{
PriorityBlockingQueue<OutboundMessage> queue = queueMap.get(gatewayId);
if (queue == null) return new ArrayList<OutboundMessage>();
return new ArrayList<OutboundMessage>(queue);
}
/**
* containsAll(c) is true when c contains a subset of elements
*/
public void testContainsAll() {
PriorityBlockingQueue q = populatedQueue(SIZE);
PriorityBlockingQueue p = new PriorityBlockingQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertTrue(q.containsAll(p));
assertFalse(p.containsAll(q));
p.add(new Integer(i));
}
assertTrue(p.containsAll(q));
}