下面列出了怎么用java.util.Queue的API类实例代码及写法,或者点击链接到github查看源代码。
private void initialize() {
//perform bgs to build failure links
final Queue<Node> queue = new LinkedList<>();
queue.add(root);
root.setFailureNode(null);
while (!queue.isEmpty()) {
final Node current = queue.poll();
for (int i = 0; i < 256; i++) {
final Node next = current.getNeighbor(i);
if (next != null) {
//traverse failure to get state
Node fail = current.getFailureNode();
while ((fail != null) && fail.getNeighbor(i) == null) {
fail = fail.getFailureNode();
}
if (fail != null) {
next.setFailureNode(fail.getNeighbor(i));
} else {
next.setFailureNode(root);
}
queue.add(next);
}
}
}
}
@Test
public void scanOverlapSubscriberSmallBuffered() {
@SuppressWarnings("unchecked")
Queue<FluxIdentityProcessor<Integer>> mockQueue = Mockito.mock(Queue.class);
CoreSubscriber<Flux<Integer>> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxWindow.WindowOverlapSubscriber<Integer> test = new FluxWindow.WindowOverlapSubscriber<Integer>(actual,
3,3, Queues.unbounded(), mockQueue);
when(mockQueue.size()).thenReturn(Integer.MAX_VALUE - 2);
//size() is 1
test.offer(Processors.unicast());
assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(Integer.MAX_VALUE - 1);
assertThat(test.scan(Scannable.Attr.LARGE_BUFFERED)).isEqualTo(Integer.MAX_VALUE - 1L);
}
public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final Queue<Integer> queue = SPSCQueueFactory.createQueue(Integer.parseInt(args[0]), Integer.getInteger("scale", 17));
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest3,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
/**
* Moves all the buffered messages for the given key into the sendQueue and removes the
* entry in the messages data structure if all the messages are moved
*
* @param target target for which the move needs to be done
* @param messagesPerTarget messages for given target
* @param key the key to be moved
* @return true if all the messages for that key are moved successfully
*/
protected boolean moveMessageToSendQueue(int target, Map<Object, Queue<Object>> messagesPerTarget,
Object key) {
Queue<Object> targetSendQueue = sendQueue.get(target);
Queue<Object> entryQueue = messagesPerTarget.get(key);
Object current;
while ((current = entryQueue.peek()) != null) {
Tuple send = new Tuple(key, current);
if (targetSendQueue.offer(send)) {
entryQueue.poll();
} else {
return false;
}
}
if (messagesPerTarget.get(key).isEmpty()) {
messagesPerTarget.remove(key);
return true;
} else {
return false;
}
}
public List<PojoClass> getPojoClassesRecursively(final String packageName, final PojoClassFilter pojoClassFilter) {
final List<PojoClass> pojoClasses = new LinkedList<PojoClass>();
final PojoClassFilter finalFilterChain = getFinalFilterChain(pojoClassFilter);
final PojoPackage pojoPackage = PojoPackageFactory.getPojoPackage(packageName);
Queue<PojoPackage> pending = new ConcurrentLinkedQueue<PojoPackage>();
pending.add(pojoPackage);
while (!pending.isEmpty()) {
final PojoPackage entry = pending.remove();
pending.addAll(entry.getPojoSubPackages());
pojoClasses.addAll(entry.getPojoClasses(finalFilterChain));
}
return pojoClasses;
}
/**
* This method add's a mix of createBucket/DeleteBucket responses to double
* buffer. Total number of responses added is specified by bucketCount.
*/
private void doMixTransactions(String volumeName, int bucketCount,
Queue<OMBucketDeleteResponse> deleteBucketQueue,
Queue<OMBucketCreateResponse> bucketQueue) {
for (int i=0; i < bucketCount; i++) {
String bucketName = UUID.randomUUID().toString();
long transactionID = trxId.incrementAndGet();
OMBucketCreateResponse omBucketCreateResponse = createBucket(volumeName,
bucketName, transactionID);
// For every 2 transactions have a deleted bucket.
if (i % 2 == 0) {
OMBucketDeleteResponse omBucketDeleteResponse =
(OMBucketDeleteResponse) deleteBucket(volumeName, bucketName,
trxId.incrementAndGet());
deleteBucketQueue.add(omBucketDeleteResponse);
} else {
bucketQueue.add(omBucketCreateResponse);
}
}
}
public int deepestLeavesSum(TreeNode root) {
if (root == null) return 0;
Queue<TreeNode> queue = new LinkedList<>();
queue.add(root);
int deepLeavesSum = root.val;
while (!queue.isEmpty()) {
int len = queue.size();
deepLeavesSum = 0;
for (int i = 0; i < len; i++) {
TreeNode node = queue.poll();
deepLeavesSum += node.val;
if (node.left != null) queue.add(node.left);
if (node.right != null) queue.add(node.right);
}
}
return deepLeavesSum;
}
/**
* Constructor that implements the {@link CacheTransactionManager} interface.
* Only only one instance per {@link com.gemstone.gemfire.cache.Cache}
*/
public TXManagerImpl(CachePerfStats cachePerfStats, LogWriterI18n logWriter,
GemFireCacheImpl cache) {
this.cache = cache;
this.dm = cache.getDistributedSystem().getDistributionManager();
this.cachePerfStats = cachePerfStats;
this.logWriter = logWriter;
this.hostedTXStates = new CustomEntryConcurrentHashMap<TXId, TXStateProxy>(
128, CustomEntryConcurrentHashMap.DEFAULT_LOAD_FACTOR,
TXMAP_CONCURRENCY);
this.suspendedTXs = new ConcurrentHashMap<TXId, TXStateInterface>();
this.finishedTXStates = new TXFinishedMap(cache.getDistributedSystem(),
cache.getCancelCriterion());
this.waitMap = new ConcurrentHashMap<TransactionId, Queue<Thread>>();
this.expiryTasks = new ConcurrentHashMap<TransactionId, SystemTimerTask>();
}
public List<Double> averageOfLevels(TreeNode root) {
ArrayList<Double> result = new ArrayList<>();
Queue<TreeNode> queue = new LinkedList<>();
queue.add(root);
while (!queue.isEmpty()) {
int size = queue.size();
double sum = 0;
for (int i = 0; i < size; i++) {
TreeNode tmp = queue.poll();
sum += (double) tmp.val;
if (tmp.left != null) queue.add(tmp.left);
if (tmp.right != null) queue.add(tmp.right);
}
result.add(sum / size);
}
return result;
}
@Test
public void testAppendThrows() throws Exception {
ReplyProcessor processor = new ReplyProcessor();
Flow flow = new Flow(10, 0);
FlowHandler flowHandler = new FlowHandler("testConnection");
@Cleanup
ClientConnection clientConnection = flowHandler.createFlow(flow, processor);
EmbeddedChannel embeddedChannel = createChannelWithContext(flowHandler);
embeddedChannel.runScheduledPendingTasks();
embeddedChannel.runPendingTasks();
Queue<Object> messages = embeddedChannel.outboundMessages();
assertEquals(1, messages.size());
clientConnection.send(new WireCommands.SetupAppend(1, new UUID(1, 2), "segment", ""));
embeddedChannel.runPendingTasks();
clientConnection.send(new Append("segment", new UUID(1, 2), 1, new Event(Unpooled.EMPTY_BUFFER), 2));
embeddedChannel.disconnect();
embeddedChannel.runPendingTasks();
assertTrue(processor.falure.get());
}
private static <L> Constituent<L> getLeastCommonAncestorConstituentHelper(
Tree<L> tree, int start, int end, int i, int j) {
if (start == i && end == j)
return new Constituent<L>(tree.getLabel(), start, end);
Queue<Tree<L>> queue = new LinkedList<Tree<L>>();
queue.addAll(tree.getChildren());
int currStart = start;
while (!queue.isEmpty()) {
Tree<L> remove = queue.remove();
List<L> currYield = remove.getYield();
final int currEnd = currStart + currYield.size();
if (currStart <= i && currEnd >= j) {
final Constituent<L> leastCommonAncestorConstituentHelper = getLeastCommonAncestorConstituentHelper(
remove, currStart, currEnd, i, j);
if (leastCommonAncestorConstituentHelper != null)
return leastCommonAncestorConstituentHelper;
else
break;
}
currStart += currYield.size();
}
return new Constituent<L>(tree.getLabel(), start, end);
}
void test(Queue<Boolean> q) {
long t0 = System.nanoTime();
for (int i = 0; i < count; i++)
check(q.add(Boolean.TRUE));
System.gc();
System.gc();
Boolean x;
while ((x = q.poll()) != null)
equal(x, Boolean.TRUE);
check(q.isEmpty());
for (int i = 0; i < 10 * count; i++) {
for (int k = 0; k < 3; k++)
check(q.add(Boolean.TRUE));
for (int k = 0; k < 3; k++)
if (q.poll() != Boolean.TRUE)
fail();
}
check(q.isEmpty());
String className = q.getClass().getSimpleName();
long elapsed = System.nanoTime() - t0;
int nanos = (int) ((double) elapsed / (10 * 3 * count));
results.put(className, String.valueOf(nanos));
}
/** Obtain an instance of a Component nested inside the given inspectable Component. */
@Nullable
public InspectableComponent getNestedInstance(Component component) {
final Queue<DebugComponent> queue = new LinkedList<>(mComponent.getChildComponents());
while (!queue.isEmpty()) {
final DebugComponent childComponent = queue.remove();
if (childComponent.getComponent() == component) {
return new InspectableComponent(childComponent);
}
queue.addAll(childComponent.getChildComponents());
}
return null;
}
@Override
public E waitPoll(Queue<E> q) throws InterruptedException
{
E e = q.poll();
if (e != null)
{
return e;
}
WAITERS_UPDATER.incrementAndGet(this);
synchronized (obj)
{
while ((e = q.poll()) == null)
{
obj.wait();
}
WAITERS_UPDATER.decrementAndGet(this);
}
return e;
}
private View findScrollableViewInternal(View content, boolean selfable) {
View scrollableView = null;
Queue<View> views = new LinkedBlockingQueue<>(Collections.singletonList(content));
while (!views.isEmpty() && scrollableView == null) {
View view = views.poll();
if (view != null) {
if ((selfable || view != content) && (view instanceof AbsListView
|| view instanceof ScrollView
|| view instanceof ScrollingView
|| view instanceof NestedScrollingChild
|| view instanceof NestedScrollingParent
|| view instanceof WebView
|| view instanceof ViewPager)) {
scrollableView = view;
} else if (view instanceof ViewGroup) {
ViewGroup group = (ViewGroup) view;
for (int j = 0; j < group.getChildCount(); j++) {
views.add(group.getChildAt(j));
}
}
}
}
return scrollableView;
}
/**
* Similar to {@link #fullDuplexCall}, except that it waits for all streaming requests to be
* received before starting the streaming responses.
*/
@Override
public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall(
final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
final Queue<Chunk> chunks = new ArrayDeque<>();
return new StreamObserver<StreamingOutputCallRequest>() {
@Override
public void onNext(StreamingOutputCallRequest request) {
chunks.addAll(toChunkQueue(request));
}
@Override
public void onCompleted() {
// Dispatch all of the chunks in one shot.
dispatcher.enqueue(chunks).completeInput();
}
@Override
public void onError(Throwable cause) {
dispatcher.onError(cause);
}
};
}
/**
*
*/
private void init() {
feederQueue = new ArrayList<Queue<Species>>(6);
for (int i = 1; i <= Board.NUM_COLS; i++) {
feederQueue.add(new LinkedList<Species>());
sizes[i - 1] = 0;
}
}
public static Object[] makeMpq(int producers, int consumers, int capacity, Ordering ordering, Queue<Integer> q)
{
ConcurrentQueueSpec spec = new ConcurrentQueueSpec(producers, consumers, capacity, ordering,
Preference.NONE);
if (q == null)
{
q = QueueFactory.newQueue(spec);
}
return new Object[] {spec, q};
}
@Nullable
public Invocation dequeue(Object identifier) {
synchronized (queues) {
Queue<Invocation> queue = queues.get(identifier);
if (queue != null) {
return queue.poll();
}
}
return null;
}
/**
* A deserialized serialized queue has same elements in same order
*/
public void testSerialization() throws Exception {
Queue x = populatedQueue(SIZE);
Queue y = serialClone(x);
assertNotSame(x, y);
assertEquals(x.size(), y.size());
assertEquals(x.toString(), y.toString());
assertTrue(Arrays.equals(x.toArray(), y.toArray()));
while (!x.isEmpty()) {
assertFalse(y.isEmpty());
assertEquals(x.remove(), y.remove());
}
assertTrue(y.isEmpty());
}
@GenerateMicroBenchmark
@OperationsPerInvocation(CAPACITY)
public void offer()
{
final Queue<Integer> lq = q;
for (int i = 0; i < CAPACITY; i++)
{
lq.offer(TOKEN);
}
}
private QueueBasedObjectPool(Queue<T> queue, boolean preAllocate, Allocator<T> allocator, Resetter<T> resetter) {
super(allocator, resetter);
this.queue = queue;
if (preAllocate) {
boolean addMore;
do {
addMore = queue.offer(allocator.createInstance());
} while (addMore);
}
}
private void prepareTable(final JavaCodeGenContext ctx, final Progress progress,
final Queue<JavaTableHost> _tableHosts, final TaskTableView tableViewSp, final String[] tableNames,
final DatabaseCategory dbCategory) {
for (final String tableName : tableNames) {
ExecuteResult result = new ExecuteResult(
"Build Table[" + tableViewSp.getAlldbs_id() + "." + tableName + "] Host");
progress.setOtherMessage(result.getTaskName());
JavaTableHost tableHost = buildTableHost(ctx, tableViewSp, tableName, dbCategory);
result.setSuccessal(true);
if (null != tableHost) {
_tableHosts.add(tableHost);
}
}
}
public PublisherWindow(Publisher<? extends T> source, int size, int skip,
Supplier<? extends Queue<T>> processorQueueSupplier,
Supplier<? extends Queue<UnicastProcessor<T>>> overflowQueueSupplier) {
super(source);
if (size <= 0) {
throw new IllegalArgumentException("size > 0 required but it was " + size);
}
if (skip <= 0) {
throw new IllegalArgumentException("skip > 0 required but it was " + skip);
}
this.size = size;
this.skip = skip;
this.processorQueueSupplier = Objects.requireNonNull(processorQueueSupplier, "processorQueueSupplier");
this.overflowQueueSupplier = Objects.requireNonNull(overflowQueueSupplier, "overflowQueueSupplier");
}
static <T> Queue<T> newMpscQueue(final int initialCapacity, final int maxCapacity) {
// Calculate the max capacity which can not be bigger then MAX_ALLOWED_MPSC_CAPACITY.
// This is forced by the MpscChunkedArrayQueue implementation as will try to round it
// up to the next power of two and so will overflow otherwise.
final int initialCap = max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity);
final int capacity = max(min(maxCapacity, MAX_ALLOWED_QUEUE_CAPACITY), MIN_MAX_MPSC_CAPACITY);
return USE_UNSAFE_QUEUES ? new MpscChunkedArrayQueue<>(initialCap, capacity)
: new MpscGrowableAtomicArrayQueue<>(initialCap, capacity);
}
@Test
public void failedInitialization() {
final Queue<String> messages = new LinkedList<>();
SimonManager.callback().removeAllCallbacks();
SimonManager.callback().addCallback(new CallbackSkeleton() {
public void onManagerWarning(String warning, Exception cause) {
messages.add(warning);
}
});
System.setProperty(SimonManager.PROPERTY_CONFIG_RESOURCE_NAME, "whateverNonexistent");
SimonManager.init();
Assert.assertEquals(messages.poll(), "SimonManager initialization error");
System.getProperties().remove(SimonManager.PROPERTY_CONFIG_RESOURCE_NAME);
}
@Benchmark
public void mpscSingleThreadExecutorWithConcurrentLinkedQueue() throws InterruptedException {
execute(new MpscSingleThreadExecutor(TIMES, new NamedThreadFactory("mpsc_clq", true)) {
@Override
protected Queue<Runnable> newTaskQueue(final int maxPendingTasks) {
return new ConcurrentLinkedQueue<>();
}
});
}
/**
* Method creates policy alternatives according to provided model. The model structure is modified in the process.
*
* @return created policy alternatives resulting from policy source model.
*/
private Collection<AssertionSet> createPolicyAlternatives(final PolicySourceModel model) throws PolicyException {
// creating global method variables
final ContentDecomposition decomposition = new ContentDecomposition();
// creating processing queue and starting the processing iterations
final Queue<RawPolicy> policyQueue = new LinkedList<RawPolicy>();
final Queue<Collection<ModelNode>> contentQueue = new LinkedList<Collection<ModelNode>>();
final RawPolicy rootPolicy = new RawPolicy(model.getRootNode(), new LinkedList<RawAlternative>());
RawPolicy processedPolicy = rootPolicy;
do {
Collection<ModelNode> processedContent = processedPolicy.originalContent;
do {
decompose(processedContent, decomposition);
if (decomposition.exactlyOneContents.isEmpty()) {
final RawAlternative alternative = new RawAlternative(decomposition.assertions);
processedPolicy.alternatives.add(alternative);
if (!alternative.allNestedPolicies.isEmpty()) {
policyQueue.addAll(alternative.allNestedPolicies);
}
} else { // we have a non-empty collection of exactly ones
final Collection<Collection<ModelNode>> combinations = PolicyUtils.Collections.combine(decomposition.assertions, decomposition.exactlyOneContents, false);
if (combinations != null && !combinations.isEmpty()) {
// processed alternative was split into some new alternatives, which we need to process
contentQueue.addAll(combinations);
}
}
} while ((processedContent = contentQueue.poll()) != null);
} while ((processedPolicy = policyQueue.poll()) != null);
// normalize nested policies to contain single alternative only
final Collection<AssertionSet> assertionSets = new LinkedList<AssertionSet>();
for (RawAlternative rootAlternative : rootPolicy.alternatives) {
final Collection<AssertionSet> normalizedAlternatives = normalizeRawAlternative(rootAlternative);
assertionSets.addAll(normalizedAlternatives);
}
return assertionSets;
}
private static void processColumnNames(Queue<String> tokens,
Queue<String> processedTokens) throws SQLException {
if (!ParserUtil.isStringLiteral(tokens.peek())) {
throw new SQLException("Syntax Error : String literal is expected");
}
processedTokens.add(Constants.COLUMN);
processedTokens.add(tokens.poll());
if (Constants.COMMA.equals(tokens.peek())) {
tokens.poll();
processColumnNames(tokens, processedTokens);
}
}
public List<List<Integer>> zigzagLevelOrder(TreeNode root) {
List<List<Integer>> res = new ArrayList<>();
if (root == null) {
return res;
}
Queue<TreeNode> queue = new LinkedList<>();
queue.add(root);
boolean direction = true;
while (!queue.isEmpty()) {
// 当前这一层遍历的节点集合
List<Integer> curList = new ArrayList<>();
// 特别注意:每一次只能处理上一轮入队列的的元素,
// 所以要将上一轮入队列的元素个数先存一下
int size = queue.size();
for (int i = 0; i < size; i++) {
TreeNode curNode = queue.poll();
if (direction) {
curList.add(curNode.val);
} else {
curList.add(0, curNode.val);
}
// 处理每一个元素都一样,都要考虑一下左右子树
if (curNode.left != null) {
queue.add(curNode.left);
}
if (curNode.right != null) {
queue.add(curNode.right);
}
}
// 改换方向
direction = !direction;
res.add(curList);
}
return res;
}