下面列出了怎么用java.util.concurrent.Exchanger的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testRateLimit() throws Exception {
final TestProducer p = new TestProducer();
MockStages.setSourceCapture(p);
final ProductionPipeline pipeline = createProductionPipeline(DeliveryGuarantee.AT_MOST_ONCE, true, 10L, PipelineType.DEFAULT);
pipeline.registerStatusListener(new MyStateListener());
final Exchanger<Double> rate = new Exchanger<>();
new Thread() {
@Override
public void run() {
try {
long start = System.nanoTime();
pipeline.run();
rate.exchange(p.count.doubleValue() * 1000 * 1000 * 1000 / (System.nanoTime() - start));
} catch (Exception ex) {
}
}
}.start();
Thread.sleep(10000);
pipeline.stop();
Double rateAchieved = rate.exchange(0.0);
// To account for the slight loss of precision, we compare the "long-ified" versions.
Assert.assertTrue(rateAchieved.longValue() <= 10);
}
@Test
public void test11() throws Exception {
Exchanger<List<String>> exchanger = new Exchanger<>();
List<String> producerList = new CopyOnWriteArrayList<>();
List<String> consumerList = new CopyOnWriteArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(() -> {
ProducerExchanger producerExchanger = new ProducerExchanger(exchanger, producerList);
while (true) {
producerExchanger.produce();
}
});
executorService.submit(() -> {
ConsumerExchanger consumerExchanger = new ConsumerExchanger(exchanger, consumerList);
while (true) {
consumerExchanger.consumer();
}
});
executorService.shutdown();
TimeUnit.SECONDS.sleep(1000);
}
@Override
protected void clientClosed(RMIConnection conn) throws IOException {
System.out.println("clientClosed, will call connectorServer.stop");
final Exchanger<Void> x = new Exchanger<Void>();
Thread t = new Thread() {
public void run() {
try {
connectorServer.stop();
} catch (Exception e) {
fail(e);
}
}
};
t.setName("connectorServer.stop");
t.start();
waitForBlock(t);
/* If this thread is synchronized on RMIServerImpl, then
* the thread that does connectorServer.stop will acquire
* the clientList lock and then block waiting for the RMIServerImpl
* lock. Our call to super.clientClosed will then deadlock because
* it needs to acquire the clientList lock.
*/
System.out.println("calling super.clientClosed");
System.out.flush();
super.clientClosed(conn);
}
@Override
protected void clientClosed(RMIConnection conn) throws IOException {
System.out.println("clientClosed, will call connectorServer.stop");
final Exchanger<Void> x = new Exchanger<Void>();
Thread t = new Thread() {
public void run() {
try {
connectorServer.stop();
} catch (Exception e) {
fail(e);
}
}
};
t.setName("connectorServer.stop");
t.start();
waitForBlock(t);
/* If this thread is synchronized on RMIServerImpl, then
* the thread that does connectorServer.stop will acquire
* the clientList lock and then block waiting for the RMIServerImpl
* lock. Our call to super.clientClosed will then deadlock because
* it needs to acquire the clientList lock.
*/
System.out.println("calling super.clientClosed");
System.out.flush();
super.clientClosed(conn);
}
@Test
public void testNotCloseZkWhenPending() throws Exception {
ZooKeeper mockedZK = mock(ZooKeeper.class);
Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>();
doAnswer(i -> {
exchanger.exchange(i.getArgument(2));
return null;
}).when(mockedZK).getData(anyString(), anyBoolean(),
any(AsyncCallback.DataCallback.class), any());
doAnswer(i -> null).when(mockedZK).close();
when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED);
RO_ZK.zookeeper = mockedZK;
CompletableFuture<byte[]> future = RO_ZK.get(PATH);
AsyncCallback.DataCallback callback = exchanger.exchange(null);
// 2 * keep alive time to ensure that we will not close the zk when there are pending requests
Thread.sleep(6000);
assertNotNull(RO_ZK.zookeeper);
verify(mockedZK, never()).close();
callback.processResult(Code.OK.intValue(), PATH, null, DATA, null);
assertArrayEquals(DATA, future.get());
// now we will close the idle connection.
waitForIdleConnectionClosed();
verify(mockedZK, times(1)).close();
}
@Override
protected void clientClosed(RMIConnection conn) throws IOException {
System.out.println("clientClosed, will call connectorServer.stop");
final Exchanger<Void> x = new Exchanger<Void>();
Thread t = new Thread() {
public void run() {
try {
connectorServer.stop();
} catch (Exception e) {
fail(e);
}
}
};
t.setName("connectorServer.stop");
t.start();
waitForBlock(t);
/* If this thread is synchronized on RMIServerImpl, then
* the thread that does connectorServer.stop will acquire
* the clientList lock and then block waiting for the RMIServerImpl
* lock. Our call to super.clientClosed will then deadlock because
* it needs to acquire the clientList lock.
*/
System.out.println("calling super.clientClosed");
System.out.flush();
super.clientClosed(conn);
}
/**
* exchange exchanges objects across two threads
*/
public void testExchange() {
final Exchanger e = new Exchanger();
Thread t1 = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertSame(one, e.exchange(two));
assertSame(two, e.exchange(one));
}});
Thread t2 = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertSame(two, e.exchange(one));
assertSame(one, e.exchange(two));
}});
awaitTermination(t1);
awaitTermination(t2);
}
/**
* timed exchange exchanges objects across two threads
*/
public void testTimedExchange() {
final Exchanger e = new Exchanger();
Thread t1 = newStartedThread(new CheckedRunnable() {
public void realRun() throws Exception {
assertSame(one, e.exchange(two, LONG_DELAY_MS, MILLISECONDS));
assertSame(two, e.exchange(one, LONG_DELAY_MS, MILLISECONDS));
}});
Thread t2 = newStartedThread(new CheckedRunnable() {
public void realRun() throws Exception {
assertSame(two, e.exchange(one, LONG_DELAY_MS, MILLISECONDS));
assertSame(one, e.exchange(two, LONG_DELAY_MS, MILLISECONDS));
}});
awaitTermination(t1);
awaitTermination(t2);
}
static void oneRun(int nthreads, int iters) throws Exception {
LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
CyclicBarrier barrier = new CyclicBarrier(nthreads + 1, timer);
Exchanger<Int> l = null;
Exchanger<Int> r = new Exchanger<>();
for (int i = 0; i < nthreads; ++i) {
pool.execute(new Stage(l, r, barrier, iters));
l = r;
r = (i+2 < nthreads) ? new Exchanger<Int>() : null;
}
barrier.await();
barrier.await();
long time = timer.getTime();
if (print)
System.out.println(LoopHelpers.rightJustify(time / (iters * nthreads + iters * (nthreads-2))) + " ns per transfer");
}
@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException, ExecutionException {
Exchanger<String> exchanger = new Exchanger<>();
Runnable runner = () -> {
try {
String message = exchanger.exchange("from runner");
assertEquals("to runner", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture<Void> result = CompletableFuture.runAsync(runner);
String msg = exchanger.exchange("to runner");
assertEquals("from runner", msg);
result.join();
}
private synchronized boolean startCapture(
final int width, final int height,
final int min_mfps, final int max_mfps) {
Log.d(TAG, "startCapture: " + width + "x" + height + "@" +
min_mfps + ":" + max_mfps);
if (cameraThread != null || cameraThreadHandler != null) {
throw new RuntimeException("Camera thread already started!");
}
Exchanger<Handler> handlerExchanger = new Exchanger<Handler>();
cameraThread = new CameraThread(handlerExchanger);
cameraThread.start();
cameraThreadHandler = exchange(handlerExchanger, null);
final Exchanger<Boolean> result = new Exchanger<Boolean>();
cameraThreadHandler.post(new Runnable() {
@Override public void run() {
startCaptureOnCameraThread(width, height, min_mfps, max_mfps, result);
}
});
boolean startResult = exchange(result, false); // |false| is a dummy value.
orientationListener.enable();
return startResult;
}
public void testContendedPut() throws Exception
{
final int max = 1000000;
CachedQuery orderedData[] = new CachedQuery[max];
for(int i=0;i<max;i++)
{
CachedQuery o = createCachedQuery(i);
orderedData[i] = o;
}
final CachedQuery data[] = shuffle(orderedData);
final Exchanger exchanger = new Exchanger();
final NonLruQueryIndex index = new NonLruQueryIndex();
PutRunnableWithExchange first = new PutRunnableWithExchange(0, max, data, index, exchanger);
PutRunnableWithExchange second = new PutRunnableWithExchange(0, max, data, index, exchanger);
ExceptionCatchingThread firstThread = new ExceptionCatchingThread(first);
ExceptionCatchingThread secondThread = new ExceptionCatchingThread(second);
firstThread.start();
secondThread.start();
firstThread.joinWithExceptionHandling();
secondThread.joinWithExceptionHandling();
assertEquals(max, index.size());
assertEquals(max, index.getEntryCount());
first.verifyExistence();
second.verifyExistence();
}
public void testContendedGetIfAbsentPut() throws Exception
{
final int max = 1000000;
String orderedData[] = new String[max];
for(int i=0;i<max;i++)
{
String o = createData(i);
orderedData[i] = o;
}
final String data[] = shuffle(orderedData);
final Exchanger exchanger = new Exchanger();
final StringIndex index = createStringPool();
GetIfAbsentPutRunnableWithExchange first = new GetIfAbsentPutRunnableWithExchange(0, max, data, index, exchanger);
GetIfAbsentPutRunnableWithExchange second = new GetIfAbsentPutRunnableWithExchange(0, max, data, index, exchanger);
ExceptionCatchingThread firstThread = new ExceptionCatchingThread(first);
ExceptionCatchingThread secondThread = new ExceptionCatchingThread(second);
firstThread.start();
secondThread.start();
firstThread.joinWithExceptionHandling();
secondThread.joinWithExceptionHandling();
assertEquals(max, index.size());
first.verifyExistence();
second.verifyExistence();
}
public void testContendedGetIfAbsentPut() throws Exception
{
final int max = 1000000;
String orderedData[] = new String[max];
for(int i=0;i<max;i++)
{
String o = createData(i);
orderedData[i] = o;
}
final String data[] = shuffle(orderedData);
final Exchanger exchanger = new Exchanger();
final ConcurrentWeakPool index = createStringPool();
GetIfAbsentPutRunnableWithExchange first = new GetIfAbsentPutRunnableWithExchange(0, max, data, index, exchanger);
GetIfAbsentPutRunnableWithExchange second = new GetIfAbsentPutRunnableWithExchange(0, max, data, index, exchanger);
ExceptionCatchingThread firstThread = new ExceptionCatchingThread(first);
ExceptionCatchingThread secondThread = new ExceptionCatchingThread(second);
firstThread.start();
secondThread.start();
firstThread.joinWithExceptionHandling();
secondThread.joinWithExceptionHandling();
assertEquals(max, index.size());
first.verifyExistence();
second.verifyExistence();
}
/**
* exchange exchanges objects across two threads
*/
public void testExchange() {
final Exchanger e = new Exchanger();
Thread t1 = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertSame(one, e.exchange(two));
assertSame(two, e.exchange(one));
}});
Thread t2 = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertSame(two, e.exchange(one));
assertSame(one, e.exchange(two));
}});
awaitTermination(t1);
awaitTermination(t2);
}
public void testContendedGetIfAbsentPut() throws Exception
{
final int max = 1000000;
TestObject orderedData[] = new TestObject[max];
for(int i=0;i<max;i++)
{
TestObject o = new TestObject(i);
orderedData[i] = o;
}
final TestObject data[] = shuffle(orderedData);
final Exchanger exchanger = new Exchanger();
Extractor[] extractors = {new ShiftedIntExtractor(1)};
final ConcurrentFullUniqueIndex<TestObject> index = new ConcurrentFullUniqueIndex(extractors, 7);
PutIfAbsentRunnableWithExchange first = new PutIfAbsentRunnableWithExchange(0, max, data, index, exchanger);
PutIfAbsentRunnableWithExchange second = new PutIfAbsentRunnableWithExchange(0, max, data, index, exchanger);
ExceptionCatchingThread firstThread = new ExceptionCatchingThread(first);
ExceptionCatchingThread secondThread = new ExceptionCatchingThread(second);
firstThread.start();
secondThread.start();
firstThread.joinWithExceptionHandling();
secondThread.joinWithExceptionHandling();
assertEquals(max, index.size());
first.verifyExistence();
second.verifyExistence();
}
@Override
protected void clientClosed(RMIConnection conn) throws IOException {
System.out.println("clientClosed, will call connectorServer.stop");
final Exchanger<Void> x = new Exchanger<Void>();
Thread t = new Thread() {
public void run() {
try {
connectorServer.stop();
} catch (Exception e) {
fail(e);
}
}
};
t.setName("connectorServer.stop");
t.start();
waitForBlock(t);
/* If this thread is synchronized on RMIServerImpl, then
* the thread that does connectorServer.stop will acquire
* the clientList lock and then block waiting for the RMIServerImpl
* lock. Our call to super.clientClosed will then deadlock because
* it needs to acquire the clientList lock.
*/
System.out.println("calling super.clientClosed");
System.out.flush();
super.clientClosed(conn);
}
@Override
protected void clientClosed(RMIConnection conn) throws IOException {
System.out.println("clientClosed, will call connectorServer.stop");
final Exchanger<Void> x = new Exchanger<Void>();
Thread t = new Thread() {
public void run() {
try {
connectorServer.stop();
} catch (Exception e) {
fail(e);
}
}
};
t.setName("connectorServer.stop");
t.start();
waitForBlock(t);
/* If this thread is synchronized on RMIServerImpl, then
* the thread that does connectorServer.stop will acquire
* the clientList lock and then block waiting for the RMIServerImpl
* lock. Our call to super.clientClosed will then deadlock because
* it needs to acquire the clientList lock.
*/
System.out.println("calling super.clientClosed");
System.out.flush();
super.clientClosed(conn);
}
static <T> Collector<T, ?, ?> secondConcurrentAddAssertingCollector(T first, T second) {
return Collector.<T, Exchanger<T>>of(
Exchanger::new,
(exchanger, t) -> {
T t1;
try {
t1 = exchanger.exchange(t, 1, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e) {
throw new AssertionError("Unexpected exception: ", e);
}
assertTrue((t1.equals(first) && t.equals(second) || t1.equals(second) && t.equals(first)));
},
(a, b) -> {
throw new AssertionError(
"Combining is not expected within secondConcurrentAddAssertingCollector");
},
Collector.Characteristics.CONCURRENT,
Collector.Characteristics.UNORDERED,
Collector.Characteristics.IDENTITY_FINISH);
}
@Override
protected void clientClosed(RMIConnection conn) throws IOException {
System.out.println("clientClosed, will call connectorServer.stop");
final Exchanger<Void> x = new Exchanger<Void>();
Thread t = new Thread() {
public void run() {
try {
connectorServer.stop();
} catch (Exception e) {
fail(e);
}
}
};
t.setName("connectorServer.stop");
t.start();
waitForBlock(t);
/* If this thread is synchronized on RMIServerImpl, then
* the thread that does connectorServer.stop will acquire
* the clientList lock and then block waiting for the RMIServerImpl
* lock. Our call to super.clientClosed will then deadlock because
* it needs to acquire the clientList lock.
*/
System.out.println("calling super.clientClosed");
System.out.flush();
super.clientClosed(conn);
}
/**
* interrupt during wait for exchange throws IE
*/
public void testExchange_InterruptedException() {
final Exchanger e = new Exchanger();
final CountDownLatch threadStarted = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedInterruptedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
e.exchange(one);
}});
await(threadStarted);
t.interrupt();
awaitTermination(t);
}
public BiometricMakeCredentialCallback(Authenticator authenticator, AuthenticatorMakeCredentialOptions options, PublicKeyCredentialSource credentialSource, Exchanger<AttestationObject> exchanger) {
super();
this.authenticator = authenticator;
this.options = options;
this.credentialSource = credentialSource;
this.exchanger = exchanger;
}
public BiometricGetAssertionCallback(Authenticator authenticator, AuthenticatorGetAssertionOptions options, PublicKeyCredentialSource selectedCredential, Exchanger<AuthenticatorGetAssertionResult> exchanger) {
super();
this.authenticator = authenticator;
this.options = options;
this.selectedCredential = selectedCredential;
this.exchanger = exchanger;
}
public PublicKeyCredentialSource selectFrom(List<PublicKeyCredentialSource> credentialList) {
// check to make sure fragmentActivity is populated
if (fragmentActivity == null) {
Log.w(TAG, "Must populate fragment activity before calling promptUser");
return null;
}
// store some instance vars for the dialog prompt to use
this.credentialList = credentialList;
this.exchanger = new Exchanger<PublicKeyCredentialSource>();
// show dialog prompt to user
FragmentActivity fragmentActivityStrongRef = fragmentActivity.get();
if (fragmentActivityStrongRef == null) {
Log.w(TAG,"FragmentActivity reference was garbage collected. Returning first matching credential.");
return credentialList.get(0);
}
show(fragmentActivityStrongRef.getSupportFragmentManager(), "credential");
// wait to retrieve credential
PublicKeyCredentialSource selectedCredential;
try {
selectedCredential = exchanger.exchange(null);
} catch (InterruptedException exception) {
Log.w(TAG, "exchange interrupted: " + exception.toString());
return null;
}
return selectedCredential;
}
private void verifyInvokerThread(final Executor executor) throws InterruptedException {
this.executor = executor;
Exchanger<Thread> invoker = new Exchanger<>();
executor.schedule(() -> {
try {
invoker.exchange(Thread.currentThread());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Exchange interrupted.");
}
}, 1, TimeUnit.MILLISECONDS);
Thread taskInvoker = invoker.exchange(Thread.currentThread());
assertThat("Unexpected thread invoked the task.", taskInvoker.getName(),
startsWith(EXPECTED_THREAD_PREFIX));
}
@Test
public void testSerializerGetsDifferentByteBufferOnRead() {
final Exchanger<ByteBuffer> exchanger = new Exchanger<>();
final ReadExchangeSerializer serializer = new ReadExchangeSerializer(exchanger);
final SerializedOnHeapValueHolder<String> valueHolder = new SerializedOnHeapValueHolder<>("test it!", System
.currentTimeMillis(), false, serializer);
new Thread(valueHolder::get).start();
valueHolder.get();
}
public static void main(String[] args) {
Exchanger<List<Integer>> exchanger = new Exchanger<>();
new Consumer(exchanger).start();
//方便调试,让consumer先执行exchange
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Producer(exchanger).start();
}
private void setPreviewDisplayOnCameraThread(
SurfaceHolder holder, Exchanger<IOException> result) {
try {
camera.setPreviewDisplay(holder);
} catch (IOException e) {
exchange(result, e);
return;
}
exchange(result, null);
return;
}
/**
* interrupt during wait for exchange throws IE
*/
public void testExchange_InterruptedException() {
final Exchanger e = new Exchanger();
final CountDownLatch threadStarted = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedInterruptedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
e.exchange(one);
}});
await(threadStarted);
t.interrupt();
awaitTermination(t);
}
/**
* interrupt during wait for timed exchange throws IE
*/
public void testTimedExchange_InterruptedException() {
final Exchanger e = new Exchanger();
final CountDownLatch threadStarted = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedInterruptedRunnable() {
public void realRun() throws Exception {
threadStarted.countDown();
e.exchange(null, LONG_DELAY_MS, MILLISECONDS);
}});
await(threadStarted);
t.interrupt();
awaitTermination(t);
}