下面列出了java.util.concurrent.Exchanger#exchange ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testRateLimit() throws Exception {
final TestProducer p = new TestProducer();
MockStages.setSourceCapture(p);
final ProductionPipeline pipeline = createProductionPipeline(DeliveryGuarantee.AT_MOST_ONCE, true, 10L, PipelineType.DEFAULT);
pipeline.registerStatusListener(new MyStateListener());
final Exchanger<Double> rate = new Exchanger<>();
new Thread() {
@Override
public void run() {
try {
long start = System.nanoTime();
pipeline.run();
rate.exchange(p.count.doubleValue() * 1000 * 1000 * 1000 / (System.nanoTime() - start));
} catch (Exception ex) {
}
}
}.start();
Thread.sleep(10000);
pipeline.stop();
Double rateAchieved = rate.exchange(0.0);
// To account for the slight loss of precision, we compare the "long-ified" versions.
Assert.assertTrue(rateAchieved.longValue() <= 10);
}
@Test
public void testNotCloseZkWhenPending() throws Exception {
ZooKeeper mockedZK = mock(ZooKeeper.class);
Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>();
doAnswer(i -> {
exchanger.exchange(i.getArgument(2));
return null;
}).when(mockedZK).getData(anyString(), anyBoolean(),
any(AsyncCallback.DataCallback.class), any());
doAnswer(i -> null).when(mockedZK).close();
when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED);
RO_ZK.zookeeper = mockedZK;
CompletableFuture<byte[]> future = RO_ZK.get(PATH);
AsyncCallback.DataCallback callback = exchanger.exchange(null);
// 2 * keep alive time to ensure that we will not close the zk when there are pending requests
Thread.sleep(6000);
assertNotNull(RO_ZK.zookeeper);
verify(mockedZK, never()).close();
callback.processResult(Code.OK.intValue(), PATH, null, DATA, null);
assertArrayEquals(DATA, future.get());
// now we will close the idle connection.
waitForIdleConnectionClosed();
verify(mockedZK, times(1)).close();
}
@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException, ExecutionException {
Exchanger<String> exchanger = new Exchanger<>();
Runnable runner = () -> {
try {
String message = exchanger.exchange("from runner");
assertEquals("to runner", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture<Void> result = CompletableFuture.runAsync(runner);
String msg = exchanger.exchange("to runner");
assertEquals("from runner", msg);
result.join();
}
private void verifyInvokerThread(final Executor executor) throws InterruptedException {
this.executor = executor;
Exchanger<Thread> invoker = new Exchanger<>();
executor.schedule(() -> {
try {
invoker.exchange(Thread.currentThread());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Exchange interrupted.");
}
}, 1, TimeUnit.MILLISECONDS);
Thread taskInvoker = invoker.exchange(Thread.currentThread());
assertThat("Unexpected thread invoked the task.", taskInvoker.getName(),
startsWith(EXPECTED_THREAD_PREFIX));
}
protected static Object waitForOtherThreadAndPassObject(final Exchanger exchanger, Object object)
{
Object result = null;
try
{
result = exchanger.exchange(object);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return result;
}
protected static Object waitForOtherThreadAndPassObject(final Exchanger exchanger, Object object)
{
Object result = null;
try
{
result = exchanger.exchange(object);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return result;
}
protected static Object waitForOtherThreadAndPassObject(final Exchanger exchanger, Object object)
{
Object result = null;
try
{
result = exchanger.exchange(object);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return result;
}
protected static Object waitForOtherThreadAndPassObject(final Exchanger exchanger, Object object)
{
Object result = null;
try
{
result = exchanger.exchange(object);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return result;
}
protected Object waitForOtherThreadAndPassObject(final Exchanger exchanger, Object object)
{
Object result = null;
try
{
getLogger().debug("waiting..");
result = exchanger.exchange(object);
getLogger().debug("done waiting");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return result;
}
protected static Object waitForOtherThreadAndPassObject(final Exchanger exchanger, Object object)
{
Object result = null;
try
{
result = exchanger.exchange(object);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return result;
}
private static <T> T exchange(Exchanger<T> exchanger, T value) {
try {
return exchanger.exchange(value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private JEditorPane paneFor(FileObject src, String fileName, String code, String sourceLevel) throws Exception, DataObjectNotFoundException, IOException {
FileObject fromFO = FileUtil.createData(src, fileName);
TestUtilities.copyStringToFile(fromFO, code);
if (sourceLevel != null) {
SourceUtilsTestUtil.setSourceLevel(fromFO, sourceLevel);
}
DataObject od = DataObject.find(fromFO);
final EditorCookie.Observable ec = od.getCookie(EditorCookie.Observable.class);
final Exchanger<JEditorPane> exch = new Exchanger<>();
class L implements PropertyChangeListener {
@Override
public void propertyChange(PropertyChangeEvent evt) {
try {
if (!EditorCookie.Observable.PROP_OPENED_PANES.equals(evt.getPropertyName())) {
return;
}
// we are in AWT
JEditorPane[] panes = ec.getOpenedPanes();
if (panes == null) {
return;
}
exch.exchange(panes[0]);
} catch (InterruptedException ex) {
}
}
}
L listener = new L();
ec.addPropertyChangeListener(listener);
JEditorPane pane = null;
try {
ec.open();
ec.openDocument().putProperty(Language.class, JavaTokenId.language());
pane = exch.exchange(null, 5, TimeUnit.SECONDS);
} finally {
ec.removePropertyChangeListener(listener);
}
assertNotNull("Editor pane not opened", pane);
return pane;
}
@Test
@InSequence(3)
public void callCountedMethodOnce() throws InterruptedException, TimeoutException {
ConcurrentGauge cGauge = registry.getConcurrentGauge(cGaugeMID);
assertThat("Concurrent Gauges is not registered correctly", cGauge, notNullValue());
// Call the counted method, block and assert it's been counted
final Exchanger<Long> exchanger = new Exchanger<>();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
exchanger.exchange(bean.countedMethod(new Callable<Long>() {
@Override
public Long call() throws Exception {
exchanger.exchange(0L);
return exchanger.exchange(0L);
}
}));
}
catch (InterruptedException cause) {
throw new RuntimeException(cause);
}
}
});
final AtomicInteger uncaught = new AtomicInteger();
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
uncaught.incrementAndGet();
}
});
thread.start();
// Wait until the method is executing and make sure that the counter has been incremented
exchanger.exchange(0L, 5L, TimeUnit.SECONDS);
assertThat("Concurrent Gauges count is incorrect", cGauge.getCount(), is(equalTo(COUNTER_COUNT.incrementAndGet())));
// Exchange the result and unblock the method execution
Long random = 1 + Math.round(Math.random() * (Long.MAX_VALUE - 1));
exchanger.exchange(random, 5L, TimeUnit.SECONDS);
// Wait until the method has returned
assertThat("Concurrent Gauges method return value is incorrect", exchanger.exchange(0L), is(equalTo(random)));
// Then make sure that the counter has been decremented
assertThat("Concurrent Gauges count is incorrect", cGauge.getCount(), is(equalTo(COUNTER_COUNT.decrementAndGet())));
// Finally make sure calling thread is returns correctly
thread.join();
assertThat("Exception thrown in method call thread", uncaught.get(), is(equalTo(0)));
}
@Test
@InSequence(3)
public void callCountedMethodOnce() throws InterruptedException, TimeoutException {
Counter counter = registry.getCounter(counterMetricID);
assertThat("Counter is not registered correctly", counter, notNullValue());
// Call the counted method, block and assert it's been counted
final Exchanger<Long> exchanger = new Exchanger<>();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
exchanger.exchange(bean.countedMethod(new Callable<Long>() {
@Override
public Long call() throws Exception {
exchanger.exchange(0L);
return exchanger.exchange(0L);
}
}));
}
catch (InterruptedException cause) {
throw new RuntimeException(cause);
}
}
});
final AtomicInteger uncaught = new AtomicInteger();
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
e.printStackTrace();
uncaught.incrementAndGet();
}
});
thread.start();
// Wait until the method is executing and make sure that the counter has been incremented
exchanger.exchange(0L, 5L, TimeUnit.SECONDS);
assertThat("Counter count is incorrect", counter.getCount(), is(equalTo(COUNTER_COUNT.incrementAndGet())));
// Exchange the result and unblock the method execution
Long random = 1 + Math.round(Math.random() * (Long.MAX_VALUE - 1));
exchanger.exchange(random, 5L, TimeUnit.SECONDS);
// Wait until the method has returned
assertThat("Counted method return value is incorrect", exchanger.exchange(0L), is(equalTo(random)));
// Then make sure that the counter has not been decremented
assertThat("Counter count is incorrect", counter.getCount(), is(equalTo(COUNTER_COUNT.get())));
// Finally make sure calling thread is returns correctly
thread.join();
assertThat("Exception thrown in method call thread", uncaught.get(), is(equalTo(0)));
}
@Test
public void multipleTopLevelRulesDontBlockEachOther() throws Exception {
Exchanger<Boolean> exchanger = new Exchanger<>();
Step exchangerStep =
new AbstractExecutionStep("interleaved_step") {
@Override
public StepExecutionResult execute(ExecutionContext context)
throws InterruptedException {
try {
// Forces both rules to wait for the other at this point.
exchanger.exchange(true, 6, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
return StepExecutionResults.SUCCESS;
}
};
BuildRule interleavedRuleOne =
createRule(
filesystem,
graphBuilder,
/* deps */ ImmutableSortedSet.of(),
/* buildSteps */ ImmutableList.of(exchangerStep),
/* postBuildSteps */ ImmutableList.of(),
/* pathToOutputFile */ null,
ImmutableList.of(InternalFlavor.of("interleaved-1")));
graphBuilder.addToIndex(interleavedRuleOne);
BuildRule interleavedRuleTwo =
createRule(
filesystem,
graphBuilder,
/* deps */ ImmutableSortedSet.of(),
/* buildSteps */ ImmutableList.of(exchangerStep),
/* postBuildSteps */ ImmutableList.of(),
/* pathToOutputFile */ null,
ImmutableList.of(InternalFlavor.of("interleaved-2")));
graphBuilder.addToIndex(interleavedRuleTwo);
// The engine needs a couple of threads to ensure that it can schedule multiple steps at the
// same time.
ListeningExecutorService executorService =
listeningDecorator(Executors.newFixedThreadPool(4));
try (CachingBuildEngine cachingBuildEngine =
cachingBuildEngineFactory().setExecutorService(executorService).build()) {
BuildEngine.BuildEngineResult engineResultOne =
cachingBuildEngine.build(
buildContext, TestExecutionContext.newInstance(), interleavedRuleOne);
BuildEngine.BuildEngineResult engineResultTwo =
cachingBuildEngine.build(
buildContext, TestExecutionContext.newInstance(), interleavedRuleTwo);
assertThat(engineResultOne.getResult().get().getStatus(), equalTo(BuildRuleStatus.SUCCESS));
assertThat(engineResultTwo.getResult().get().getStatus(), equalTo(BuildRuleStatus.SUCCESS));
}
executorService.shutdown();
}
@Test
@InSequence(3)
public void callCountedMethodOnce() throws InterruptedException, TimeoutException {
assertThat("Counter is not registered correctly", registry.getCounters(), hasKey(COUNTER_NAME));
Counter counter = registry.getCounters().get(COUNTER_NAME);
// Call the counted method, block and assert it's been counted
final Exchanger<Long> exchanger = new Exchanger<>();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
exchanger.exchange(bean.monotonicCountedMethod(new Callable<Long>() {
@Override
public Long call() throws Exception {
exchanger.exchange(0L);
return exchanger.exchange(0L);
}
}));
} catch (InterruptedException cause) {
throw new RuntimeException(cause);
}
}
});
final AtomicInteger uncaught = new AtomicInteger();
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
uncaught.incrementAndGet();
}
});
thread.start();
// Wait until the method is executing and make sure that the counter has been incremented
exchanger.exchange(0L, 5L, TimeUnit.SECONDS);
assertThat("Counter count is incorrect", counter.getCount(), is(equalTo(COUNTER_COUNT.incrementAndGet())));
// Exchange the result and unblock the method execution
Long random = 1 + Math.round(Math.random() * (Long.MAX_VALUE - 1));
exchanger.exchange(random, 5L, TimeUnit.SECONDS);
// Wait until the method has returned
assertThat("Counted method return value is incorrect", exchanger.exchange(0L), is(equalTo(random)));
// Then make sure that the counter has not been decremented
assertThat("Counter count is incorrect", counter.getCount(), is(equalTo(COUNTER_COUNT.get())));
// Finally make sure calling thread is returns correctly
thread.join();
assertThat("Exception thrown in method call thread", uncaught.get(), is(equalTo(0)));
}
@Test
@InSequence(3)
public void callCountedMethodOnce() throws InterruptedException, TimeoutException {
assertThat("Counter is not registered correctly", registry.getCounters(), hasKey(COUNTER_NAME));
Counter counter = registry.getCounters().get(COUNTER_NAME);
// Call the counted method, block and assert it's been counted
final Exchanger<Long> exchanger = new Exchanger<>();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
exchanger.exchange(bean.countedMethod(new Callable<Long>() {
@Override
public Long call() throws Exception {
exchanger.exchange(0L);
return exchanger.exchange(0L);
}
}));
} catch (InterruptedException cause) {
throw new RuntimeException(cause);
}
}
});
final AtomicInteger uncaught = new AtomicInteger();
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
uncaught.incrementAndGet();
}
});
thread.start();
// Wait until the method is executing and make sure that the counter has been incremented
exchanger.exchange(0L, 5L, TimeUnit.SECONDS);
assertThat("Counter count is incorrect", counter.getCount(), is(equalTo(COUNTER_COUNT.incrementAndGet())));
// Exchange the result and unblock the method execution
Long random = 1 + Math.round(Math.random() * (Long.MAX_VALUE - 1));
exchanger.exchange(random, 5L, TimeUnit.SECONDS);
// Wait until the method has returned
assertThat("Counted method return value is incorrect", exchanger.exchange(0L), is(equalTo(random)));
// Then make sure that the counter has been decremented
assertThat("Counter count is incorrect", counter.getCount(), is(equalTo(COUNTER_COUNT.decrementAndGet())));
// Finally make sure calling thread is returns correctly
thread.join();
assertThat("Exception thrown in method call thread", uncaught.get(), is(equalTo(0)));
}