下面列出了怎么用java.util.concurrent.LinkedTransferQueue的API类实例代码及写法,或者点击链接到github查看源代码。
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
public MySQLDataStoreLoader(DataStoreConfig dataStoreConfig, DataOutputQueue queueOut, AtomicBoolean stop,
LinkedTransferQueue<Map<String, String>> loadedResultsFromDBQueue,
LinkedTransferQueue<List<String>> loadQueue, String url, String user, String password, String table) {
this.url = url;
this.user = user;
this.password = password;
this.table = table;
this.dataStoreConfig = dataStoreConfig;
this.stop = stop;
this.loadedResultsFromDBQueue = loadedResultsFromDBQueue;
this.loadQueue = loadQueue;
this.queueOut = queueOut;
try {
store = new SimpleStringKeyValueStoreMySQL(url, user, password, table, dataStoreConfig.sqlBatchWrite());
} catch (Exception ex) {
store = StringKeyValueStoreNoOp.SINGLETON;
if (debug) logger.warn(ex, "Unable to connect to MySQL, DS will not be sending data", ex.getMessage());
logger.error("Unable to connect to MySQL, DS will not be sending data", ex.getMessage());
}
}
/**
* transfer waits until a poll occurs, at which point the polling
* thread returns the element
*/
public void testTransfer4() throws InterruptedException {
final LinkedTransferQueue q = new LinkedTransferQueue();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.transfer(four);
assertFalse(q.contains(four));
assertSame(three, q.poll());
}});
while (q.isEmpty())
Thread.yield();
assertFalse(q.isEmpty());
assertEquals(1, q.size());
assertTrue(q.offer(three));
assertSame(four, q.poll());
awaitTermination(t);
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
/**
* timed poll retrieves elements across Executor threads
*/
public void testPollInExecutor() {
final LinkedTransferQueue q = new LinkedTransferQueue();
final CheckedBarrier threadsStarted = new CheckedBarrier(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(executor)) {
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertNull(q.poll());
threadsStarted.await();
long startTime = System.nanoTime();
assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
checkEmpty(q);
}});
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
q.put(one);
}});
}
}
Runnable newLinkedTransferQueueRunner() {
final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
return new Runnable() {
@Override public void run() {
final ThreadLocalRandom random = ThreadLocalRandom.current();
for (;;) {
if (random.nextBoolean()) {
queue.offer(ELEMENT);
} else {
queue.poll();
}
calls.increment();
}
}
};
}
public static void test(Timer timer, String expected) throws InterruptedException {
try {
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
TimerTask task = new TimerTask() {
public void run() {
queue.put(Thread.currentThread().getName());
}
};
timer.schedule(task, 0L); // immediately
String actual = queue.take();
if (!expected.equals(actual)) {
throw new AssertionError(
String.format("expected='%s', actual='%s'", expected, actual));
}
} finally {
timer.cancel();
}
}
/**
* retainAll(c) retains only those elements of c and reports true
* if changed
*/
public void testRetainAll() {
LinkedTransferQueue q = populatedQueue(SIZE);
LinkedTransferQueue p = populatedQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
boolean changed = q.retainAll(p);
if (i == 0) {
assertFalse(changed);
} else {
assertTrue(changed);
}
assertTrue(q.containsAll(p));
assertEquals(SIZE - i, q.size());
p.remove();
}
}
public static void main(String[] args) throws Exception {
final int maxConsumers = (args.length > 0)
? Integer.parseInt(args[0])
: 5;
pool = Executors.newCachedThreadPool();
for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
// Adjust iterations to limit typical single runs to <= 10 ms;
// Notably, fair queues get fewer iters.
// Unbounded queues can legitimately OOME if iterations
// high enough, but we have a sufficiently low limit here.
run(new ArrayBlockingQueue<Integer>(100), i, 1000);
run(new LinkedBlockingQueue<Integer>(100), i, 1000);
run(new LinkedBlockingDeque<Integer>(100), i, 1000);
run(new LinkedTransferQueue<Integer>(), i, 700);
run(new PriorityBlockingQueue<Integer>(), i, 1000);
run(new SynchronousQueue<Integer>(), i, 300);
run(new SynchronousQueue<Integer>(true), i, 200);
run(new ArrayBlockingQueue<Integer>(100, true), i, 100);
}
pool.shutdown();
if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS))
throw new Error();
pool = null;
}
public static void main(String[] args) throws Exception {
final int maxPairs = (args.length > 0)
? Integer.parseInt(args[0])
: 5;
int iters = 10000;
pool = Executors.newCachedThreadPool();
for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
// Adjust iterations to limit typical single runs to <= 10 ms;
// Notably, fair queues get fewer iters.
// Unbounded queues can legitimately OOME if iterations
// high enough, but we have a sufficiently low limit here.
run(new ArrayBlockingQueue<Integer>(100), i, 500);
run(new LinkedBlockingQueue<Integer>(100), i, 1000);
run(new LinkedBlockingDeque<Integer>(100), i, 1000);
run(new LinkedTransferQueue<Integer>(), i, 1000);
run(new PriorityBlockingQueue<Integer>(), i, 1000);
run(new SynchronousQueue<Integer>(), i, 400);
run(new SynchronousQueue<Integer>(true), i, 300);
run(new ArrayBlockingQueue<Integer>(100, true), i, 100);
}
pool.shutdown();
if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS))
throw new Error();
pool = null;
}
/**
* offer transfers elements across Executor tasks
*/
public void testOfferInExecutor() {
final LinkedTransferQueue q = new LinkedTransferQueue();
final CheckedBarrier threadsStarted = new CheckedBarrier(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(executor)) {
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
long startTime = System.nanoTime();
assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
}});
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
assertSame(one, q.take());
checkEmpty(q);
}});
}
}
/**
* transfer waits until a poll occurs, at which point the polling
* thread returns the element
*/
public void testTransfer4() throws InterruptedException {
final LinkedTransferQueue q = new LinkedTransferQueue();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.transfer(four);
assertFalse(q.contains(four));
assertSame(three, q.poll());
}});
while (q.isEmpty())
Thread.yield();
assertFalse(q.isEmpty());
assertEquals(1, q.size());
assertTrue(q.offer(three));
assertSame(four, q.poll());
awaitTermination(t);
}
/**
* timed poll retrieves elements across Executor threads
*/
public void testPollInExecutor() {
final LinkedTransferQueue q = new LinkedTransferQueue();
final CheckedBarrier threadsStarted = new CheckedBarrier(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(executor)) {
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertNull(q.poll());
threadsStarted.await();
long startTime = System.nanoTime();
assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
checkEmpty(q);
}});
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
q.put(one);
}});
}
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
/**
* Queue contains all elements of the collection it is initialized by
*/
public void testConstructor5() {
Integer[] ints = new Integer[SIZE];
for (int i = 0; i < SIZE; ++i) {
ints[i] = i;
}
List intList = Arrays.asList(ints);
LinkedTransferQueue q
= new LinkedTransferQueue(intList);
assertEquals(q.size(), intList.size());
assertEquals(q.toString(), intList.toString());
assertTrue(Arrays.equals(q.toArray(),
intList.toArray()));
assertTrue(Arrays.equals(q.toArray(new Object[0]),
intList.toArray(new Object[0])));
assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
intList.toArray(new Object[SIZE])));
for (int i = 0; i < SIZE; ++i) {
assertEquals(ints[i], q.poll());
}
}
/**
* Checks that traversal operations collapse a random pattern of
* dead nodes as could normally only occur with a race.
*/
@Test(dataProvider = "traversalActions")
public void traversalOperationsCollapseRandomNodes(
Consumer<LinkedTransferQueue> traversalAction) {
LinkedTransferQueue q = new LinkedTransferQueue();
int n = rnd.nextInt(6);
for (int i = 0; i < n; i++) q.add(i);
ArrayList nulledOut = new ArrayList();
for (Object p = head(q); p != null; p = next(p))
if (rnd.nextBoolean()) {
nulledOut.add(item(p));
ITEM.setVolatile(p, null);
}
traversalAction.accept(q);
int c = nodeCount(q);
assertEquals(q.size(), c - (q.contains(n - 1) ? 0 : 1));
for (int i = 0; i < n; i++)
assertTrue(nulledOut.contains(i) ^ q.contains(i));
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
/**
* drainTo(c, n) empties first min(n, size) elements of queue into c
*/
public void testDrainToN() {
LinkedTransferQueue q = new LinkedTransferQueue();
for (int i = 0; i < SIZE + 2; ++i) {
for (int j = 0; j < SIZE; j++) {
assertTrue(q.offer(j));
}
ArrayList l = new ArrayList();
q.drainTo(l, i);
int k = (i < SIZE) ? i : SIZE;
assertEquals(k, l.size());
assertEquals(SIZE - k, q.size());
for (int j = 0; j < k; ++j)
assertEquals(j, l.get(j));
do {} while (q.poll() != null);
}
}
@Test(dataProvider = "addActions")
public void addActionsOneNodeSlack(
Consumer<LinkedTransferQueue> addAction) {
LinkedTransferQueue q = new LinkedTransferQueue();
int n = 1 + rnd.nextInt(9);
for (int i = 0; i < n; i++) {
boolean slack = next(tail(q)) != null;
addAction.accept(q);
if (slack)
assertNull(next(tail(q)));
else {
assertNotNull(next(tail(q)));
assertNull(next(next(tail(q))));
}
assertInvariants(q);
}
}
/**
* If there is a consumer waiting in timed poll, tryTransfer
* returns true while successfully transfering object.
*/
public void testTryTransfer3() throws InterruptedException {
final Object hotPotato = new Object();
final LinkedTransferQueue q = new LinkedTransferQueue();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
while (! q.hasWaitingConsumer())
Thread.yield();
assertTrue(q.hasWaitingConsumer());
checkEmpty(q);
assertTrue(q.tryTransfer(hotPotato));
}});
long startTime = System.nanoTime();
assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
checkEmpty(q);
awaitTermination(t);
}
/**
* tryTransfer gives up after the timeout and returns false
*/
public void testTryTransfer6() throws InterruptedException {
final LinkedTransferQueue q = new LinkedTransferQueue();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
long startTime = System.nanoTime();
assertFalse(q.tryTransfer(new Object(),
timeoutMillis(), MILLISECONDS));
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
checkEmpty(q);
}});
awaitTermination(t);
checkEmpty(q);
}
/**
* iterator iterates through all elements
*/
public void testIterator() throws InterruptedException {
LinkedTransferQueue q = populatedQueue(SIZE);
Iterator it = q.iterator();
int i;
for (i = 0; it.hasNext(); i++)
assertTrue(q.contains(it.next()));
assertEquals(i, SIZE);
assertIteratorExhausted(it);
it = q.iterator();
for (i = 0; it.hasNext(); i++)
assertEquals(it.next(), q.take());
assertEquals(i, SIZE);
assertIteratorExhausted(it);
}
@Test
void noPingAtAll() throws Exception {
final BlockingQueue<RequestLogAccess> healthCheckRequestLogs = new LinkedTransferQueue<>();
this.healthCheckRequestLogs = healthCheckRequestLogs;
final Endpoint endpoint = Endpoint.of("127.0.0.1", server.httpPort());
try (HealthCheckedEndpointGroup endpointGroup = build(
HealthCheckedEndpointGroup.builder(endpoint, "/no_ping_at_all"))) {
Thread.sleep(3000);
assertFirstRequest(healthCheckRequestLogs);
// The second request must time out while long-polling.
final RequestLog longPollingRequestLog = healthCheckRequestLogs.take().whenComplete().join();
assertThat(longPollingRequestLog.responseCause()).isInstanceOf(ResponseTimeoutException.class);
// There must be no '102 Processing' headers received.
final BlockingQueue<ResponseHeaders> receivedInformationals =
longPollingRequestLog.context().attr(RECEIVED_INFORMATIONALS);
assertThat(receivedInformationals).isEmpty();
// Eventually, the endpoint must stay healthy.
assertThat(endpointGroup.endpoints()).isEmpty();
}
}
/**
* tryTransfer waits for any elements previously in to be removed
* before transfering to a poll or take
*/
public void testTryTransfer7() throws InterruptedException {
final LinkedTransferQueue q = new LinkedTransferQueue();
assertTrue(q.offer(four));
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
long startTime = System.nanoTime();
assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
checkEmpty(q);
}});
while (q.size() != 2)
Thread.yield();
assertEquals(2, q.size());
assertSame(four, q.poll());
assertSame(five, q.poll());
checkEmpty(q);
awaitTermination(t);
}
Collection<Queue<Boolean>> concurrentQueues() {
List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
queues.add(new ConcurrentLinkedDeque<Boolean>());
queues.add(new ConcurrentLinkedQueue<Boolean>());
queues.add(new ArrayBlockingQueue<Boolean>(count, false));
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect.
Collections.shuffle(queues);
return queues;
}
/**
* transfer waits until a poll occurs. The transfered element
* is returned by this associated poll.
*/
public void testTransfer2() throws InterruptedException {
final LinkedTransferQueue<Integer> q
= new LinkedTransferQueue<Integer>();
final CountDownLatch threadStarted = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
q.transfer(five);
checkEmpty(q);
}});
threadStarted.await();
Callable<Boolean> oneElement
= new Callable<Boolean>() { public Boolean call() {
return !q.isEmpty() && q.size() == 1; }};
waitForThreadToEnterWaitState(t, oneElement);
assertSame(five, q.poll());
checkEmpty(q);
awaitTermination(t);
}
public static void main(String[] args) throws InterruptedException {
System.setProperty("java.util.logging.SimpleFormatter.format",
"[%1$tT] [%4$-7s] %5$s %n");
Thread thread = new Thread(() -> {
TransferQueue<String> queue = new LinkedTransferQueue<>();
while (!Thread.currentThread().isInterrupted()) {
try {
logger.info(() -> "For 3 seconds the thread "
+ Thread.currentThread().getName()
+ " will try to poll an element from queue ...");
queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
logger.severe(() -> "InterruptedException! The thread "
+ Thread.currentThread().getName() + " was intrrupted!");
Thread.currentThread().interrupt(); // comment this line to see the effect
}
}
logger.info(() -> "The execution was stopped!");
});
thread.start();
Thread.sleep(1500);
thread.interrupt();
}
/**
* toArray(a) contains all elements in FIFO order
*/
public void testToArray2() {
LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
Integer[] ints = new Integer[SIZE];
Integer[] array = q.toArray(ints);
assertSame(ints, array);
for (int i = 0; i < ints.length; i++) {
assertSame(ints[i], q.poll());
}
}
/**
* addAll of a collection with any null elements throws
* NullPointerException after possibly adding some elements
*/
public void testAddAll3() {
LinkedTransferQueue q = new LinkedTransferQueue();
Integer[] ints = new Integer[SIZE];
for (int i = 0; i < SIZE - 1; ++i)
ints[i] = i;
try {
q.addAll(Arrays.asList(ints));
shouldThrow();
} catch (NullPointerException success) {}
}