下面列出了java.util.concurrent.ArrayBlockingQueue#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
ArrayBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* iterator.remove removes current element
*/
public void testIteratorRemove() {
final ArrayBlockingQueue q = new ArrayBlockingQueue(3);
q.add(two);
q.add(one);
q.add(three);
Iterator it = q.iterator();
it.next();
it.remove();
it = q.iterator();
assertSame(it.next(), one);
assertSame(it.next(), three);
assertFalse(it.hasNext());
}
/**
* iterator.remove removes current element
*/
public void testIteratorRemove() {
final ArrayBlockingQueue q = new ArrayBlockingQueue(3);
q.add(two);
q.add(one);
q.add(three);
Iterator it = q.iterator();
it.next();
it.remove();
it = q.iterator();
assertSame(it.next(), one);
assertSame(it.next(), three);
assertFalse(it.hasNext());
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
ArrayBlockingQueue q = populatedQueue(SIZE);
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(SIZE, l.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
q.add(zero);
q.add(one);
assertFalse(q.isEmpty());
assertTrue(q.contains(zero));
assertTrue(q.contains(one));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(l.get(i), new Integer(i));
}
/**
* This test tests the CheckedQueue.add method. It creates a queue of
* {@code String}s gets the checked queue, and attempt to add an Integer to
* the checked queue.
*/
@Test(expectedExceptions = ClassCastException.class)
public void testAddFail1() {
int arrayLength = 10;
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(arrayLength + 1);
for (int i = 0; i < arrayLength; i++) {
abq.add(Integer.toString(i));
}
Queue q = Collections.checkedQueue(abq, String.class);
q.add(0);
}
/**
* This test tests the CheckedQueue.add method. It creates a queue of
* {@code String}s gets the checked queue, and attempt to add an Integer to
* the checked queue.
*/
@Test(expectedExceptions = ClassCastException.class)
public void testAddFail1() {
int arrayLength = 10;
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(arrayLength + 1);
for (int i = 0; i < arrayLength; i++) {
abq.add(Integer.toString(i));
}
Queue q = Collections.checkedQueue(abq, String.class);
q.add(0);
}
public void queueEmptyingWillClearItrs() {
boolean fair = rnd.nextBoolean();
int capacity = rnd.nextInt(2, 10);
ArrayBlockingQueue q = new ArrayBlockingQueue(capacity, fair);
randomizePutIndex(q);
List<Iterator> its = new ArrayList<>();
for (int i = 0; i < capacity; i++)
q.add(i);
assertNull(itrs(q));
for (int i = 0; i < capacity; i++) {
its.add(q.iterator());
assertEquals(trackedIterators(q), its);
q.poll();
q.add(capacity+i);
}
for (int i = 0; i < capacity; i++)
q.poll();
assertNull(itrs(q));
int j = 0;
for (Iterator it : its) {
assertTrue(isDetached(it));
if (rnd.nextBoolean()) assertTrue(it.hasNext());
if (rnd.nextBoolean()) {
assertEquals(it.next(), j);
assertIteratorExhausted(it);
}
j++;
}
}
/**
* Modifications do not cause iterators to fail
*/
public void testWeaklyConsistentIteration() {
final ArrayBlockingQueue q = new ArrayBlockingQueue(3);
q.add(one);
q.add(two);
q.add(three);
for (Iterator it = q.iterator(); it.hasNext();) {
q.remove();
it.next();
}
assertEquals(0, q.size());
}
public ReporterImpl<T> addRecord(T record) {
ArrayBlockingQueue<T> reportGroup = reportBuckets.computeIfAbsent(record.bucketId(), k -> new ArrayBlockingQueue<>(options.getInitialBucketCapacity()));
reportGroup.add(record);
// This is just approximate, we don't care whether it's somewhat out.
if (reportGroup.size() >= options.getBucketFullTriggerSize()) {
full();
}
return this;
}
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
queue.add(1);
queue.add(2);
queue.add(3);
System.out.println("Before drainTo Operation");
System.out.println("queue = " + queue);
ArrayList<Integer> list = new ArrayList();
queue.drainTo(list);
System.out.println("After drainTo Operation");
System.out.println("queue = " + queue);
System.out.println("collection = " + list);
}
/**
* This test tests the CheckedQueue.add method. It creates a queue of
* {@code String}s gets the checked queue, and attempt to add an Integer to
* the checked queue.
*/
@Test(expectedExceptions = ClassCastException.class)
public void testAddFail1() {
int arrayLength = 10;
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(arrayLength + 1);
for (int i = 0; i < arrayLength; i++) {
abq.add(Integer.toString(i));
}
Queue q = Collections.checkedQueue(abq, String.class);
q.add(0);
}
/**
* Checks collective sanity of iteration, toArray() and toString().
*/
public void collectiveSanity() {
boolean fair = rnd.nextBoolean();
int capacity = rnd.nextInt(10, 20);
ArrayBlockingQueue q = new ArrayBlockingQueue(capacity, fair);
randomizePutIndex(q);
for (int i = 0; i < capacity; i++) {
checkIterationSanity(q);
assertEquals(capacity, q.size() + q.remainingCapacity());
q.add(i);
}
for (int i = 0; i < (capacity + (capacity >> 1)); i++) {
checkIterationSanity(q);
assertEquals(capacity, q.size() + q.remainingCapacity());
assertEquals(i, q.peek());
assertEquals(i, q.poll());
checkIterationSanity(q);
assertEquals(capacity, q.size() + q.remainingCapacity());
q.add(capacity + i);
}
for (int i = 0; i < capacity; i++) {
checkIterationSanity(q);
assertEquals(capacity, q.size() + q.remainingCapacity());
int expected = i + capacity + (capacity >> 1);
assertEquals(expected, q.peek());
assertEquals(expected, q.poll());
}
checkIterationSanity(q);
}
/**
* This test tests the CheckedQueue.add method. It creates a queue of
* {@code String}s gets the checked queue, and attempt to add an Integer to
* the checked queue.
*/
@Test(expectedExceptions = ClassCastException.class)
public void testAddFail1() {
int arrayLength = 10;
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(arrayLength + 1);
for (int i = 0; i < arrayLength; i++) {
abq.add(Integer.toString(i));
}
Queue q = Collections.checkedQueue(abq, String.class);
q.add(0);
}
/**
* This test tests the CheckedQueue.add method. It creates a queue of
* {@code String}s gets the checked queue, and attempt to add an Integer to
* the checked queue.
*/
@Test(expectedExceptions = ClassCastException.class)
public void testAddFail1() {
int arrayLength = 10;
ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(arrayLength + 1);
for (int i = 0; i < arrayLength; i++) {
abq.add(Integer.toString(i));
}
Queue q = Collections.checkedQueue(abq, String.class);
q.add(0);
}
@DataProvider(name = "collections")
public Object[][] createCollections() {
List<Integer> content = LambdaTestHelpers.countTo(10);
List<Collection<Integer>> collections = new ArrayList<>();
collections.add(new ArrayList<>(content));
collections.add(new LinkedList<>(content));
collections.add(new Vector<>(content));
collections.add(new HashSet<>(content));
collections.add(new LinkedHashSet<>(content));
collections.add(new TreeSet<>(content));
Stack<Integer> stack = new Stack<>();
stack.addAll(content);
collections.add(stack);
collections.add(new PriorityQueue<>(content));
collections.add(new ArrayDeque<>(content));
// Concurrent collections
collections.add(new ConcurrentSkipListSet<>(content));
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(content.size());
for (Integer i : content)
arrayBlockingQueue.add(i);
collections.add(arrayBlockingQueue);
collections.add(new PriorityBlockingQueue<>(content));
collections.add(new LinkedBlockingQueue<>(content));
collections.add(new LinkedTransferQueue<>(content));
collections.add(new ConcurrentLinkedQueue<>(content));
collections.add(new LinkedBlockingDeque<>(content));
collections.add(new ConcurrentLinkedDeque<>(content));
Object[][] params = new Object[collections.size()][];
for (int i = 0; i < collections.size(); i++) {
params[i] = new Object[]{collections.get(i).getClass().getName(), collections.get(i)};
}
return params;
}
/**
* 39469
* @throws InterruptedException
*/
@Test
public void testABQ() throws InterruptedException {
final ArrayBlockingQueue queue = new ArrayBlockingQueue(5000);
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
long startTime = -1;
int dataCounter = 0;
while (true) {
ArrayList data = new ArrayList();
queue.drainTo(data);
if (startTime == -1 && data.size() > 0) {
startTime = System.currentTimeMillis();
}
dataCounter += data.size();
if (dataCounter == totalSize) {
break;
}
}
System.out.println("time cost:" + (System.currentTimeMillis() - startTime));
}
});
consumer.start();
for (int i = 0; i < totalSize; i++) {
boolean status = false;
while (!status) {
try {
queue.add(i);
status = true;
} catch (Exception e) {
}
}
}
consumer.join();
}
@DataProvider(name = "collections")
public Object[][] createCollections() {
List<Integer> content = LambdaTestHelpers.countTo(10);
List<Collection<Integer>> collections = new ArrayList<>();
collections.add(new ArrayList<>(content));
collections.add(new LinkedList<>(content));
collections.add(new Vector<>(content));
collections.add(new HashSet<>(content));
collections.add(new LinkedHashSet<>(content));
collections.add(new TreeSet<>(content));
Stack<Integer> stack = new Stack<>();
stack.addAll(content);
collections.add(stack);
collections.add(new PriorityQueue<>(content));
collections.add(new ArrayDeque<>(content));
// Concurrent collections
collections.add(new ConcurrentSkipListSet<>(content));
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(content.size());
for (Integer i : content)
arrayBlockingQueue.add(i);
collections.add(arrayBlockingQueue);
collections.add(new PriorityBlockingQueue<>(content));
collections.add(new LinkedBlockingQueue<>(content));
collections.add(new LinkedTransferQueue<>(content));
collections.add(new ConcurrentLinkedQueue<>(content));
collections.add(new LinkedBlockingDeque<>(content));
collections.add(new ConcurrentLinkedDeque<>(content));
Object[][] params = new Object[collections.size()][];
for (int i = 0; i < collections.size(); i++) {
params[i] = new Object[]{collections.get(i).getClass().getName(), collections.get(i)};
}
return params;
}
@DataProvider(name = "collections")
public Object[][] createCollections() {
List<Integer> content = LambdaTestHelpers.countTo(10);
List<Collection<Integer>> collections = new ArrayList<>();
collections.add(new ArrayList<>(content));
collections.add(new LinkedList<>(content));
collections.add(new Vector<>(content));
collections.add(new HashSet<>(content));
collections.add(new LinkedHashSet<>(content));
collections.add(new TreeSet<>(content));
Stack<Integer> stack = new Stack<>();
stack.addAll(content);
collections.add(stack);
collections.add(new PriorityQueue<>(content));
collections.add(new ArrayDeque<>(content));
// Concurrent collections
collections.add(new ConcurrentSkipListSet<>(content));
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(content.size());
for (Integer i : content)
arrayBlockingQueue.add(i);
collections.add(arrayBlockingQueue);
collections.add(new PriorityBlockingQueue<>(content));
collections.add(new LinkedBlockingQueue<>(content));
collections.add(new LinkedTransferQueue<>(content));
collections.add(new ConcurrentLinkedQueue<>(content));
collections.add(new LinkedBlockingDeque<>(content));
collections.add(new ConcurrentLinkedDeque<>(content));
Object[][] params = new Object[collections.size()][];
for (int i = 0; i < collections.size(); i++) {
params[i] = new Object[]{collections.get(i).getClass().getName(), collections.get(i)};
}
return params;
}
private void performDoubleCountTest(String testClass, InstrumentationSettings settings) throws Exception {
// This test is being wrapped in a new thread where the thread's context classlaoder is being set to the classloader of the zip
// we're dynamically loading. We need to do this being ObjectInputStream uses the system classloader by default, not the thread's
// classloader. CoroutineReader has been modified to use the thread's classloader if the system's classloader fails.
try (URLClassLoader classLoader = loadClassesInZipResourceAndInstrument(testClass + ".zip", settings)) {
ArrayBlockingQueue<Throwable> threadResult = new ArrayBlockingQueue<>(1);
Thread thread = new Thread(() -> {
try {
Class<Coroutine> cls = (Class<Coroutine>) classLoader.loadClass(testClass);
Coroutine coroutine = invokeConstructor(cls, new StringBuilder());
// Create and run original for a few cycles
CoroutineRunner runner = new CoroutineRunner(coroutine);
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertFalse((runner = writeReadExecute(runner)).execute()); // coroutine finished executing here
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
assertTrue((runner = writeReadExecute(runner)).execute());
// Assert everything continued fine with deserialized version
Object deserializedCoroutine = readField(runner, "coroutine", true);
StringBuilder deserializedBuilder = (StringBuilder) readField(deserializedCoroutine, "builder", true);
assertEquals("started\n"
+ "0.0\n"
+ "1.0\n"
+ "2.0\n"
+ "3.0\n"
+ "4.0\n"
+ "5.0\n"
+ "6.0\n"
+ "7.0\n"
+ "8.0\n"
+ "9.0\n"
+ "started\n"
+ "0.0\n"
+ "1.0\n"
+ "2.0\n", deserializedBuilder.toString());
} catch (AssertionError | Exception e) {
threadResult.add(e);
}
});
thread.setContextClassLoader(classLoader);
thread.start();
thread.join();
Throwable t = (Throwable) threadResult.peek();
if (t != null) {
if (t instanceof Exception) {
throw (Exception) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new RuntimeException();
}
}
}
}
@DataProvider(name = "collections")
public Object[][] createCollections() {
List<Integer> content = LambdaTestHelpers.countTo(10);
List<Collection<Integer>> collections = new ArrayList<>();
collections.add(new ArrayList<>(content));
collections.add(new LinkedList<>(content));
collections.add(new Vector<>(content));
collections.add(new HashSet<>(content));
collections.add(new LinkedHashSet<>(content));
collections.add(new TreeSet<>(content));
Stack<Integer> stack = new Stack<>();
stack.addAll(content);
collections.add(stack);
collections.add(new PriorityQueue<>(content));
collections.add(new ArrayDeque<>(content));
// Concurrent collections
collections.add(new ConcurrentSkipListSet<>(content));
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(content.size());
for (Integer i : content)
arrayBlockingQueue.add(i);
collections.add(arrayBlockingQueue);
collections.add(new PriorityBlockingQueue<>(content));
collections.add(new LinkedBlockingQueue<>(content));
collections.add(new LinkedTransferQueue<>(content));
collections.add(new ConcurrentLinkedQueue<>(content));
collections.add(new LinkedBlockingDeque<>(content));
collections.add(new ConcurrentLinkedDeque<>(content));
Object[][] params = new Object[collections.size()][];
for (int i = 0; i < collections.size(); i++) {
params[i] = new Object[]{collections.get(i).getClass().getName(), collections.get(i)};
}
return params;
}