下面列出了java.util.concurrent.atomic.AtomicBoolean#notify() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Sync with mock app launcher.
*
* @param allowScheduling the allow scheduling
* @param mockAppLauncherGoFlag the mock app launcher go flag
* @param tezClient the tez client
* @throws Exception the exception
*/
void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag,
MockTezClient tezClient) throws Exception {
synchronized (mockAppLauncherGoFlag) {
while (!mockAppLauncherGoFlag.get()) {
mockAppLauncherGoFlag.wait();
}
mockApp = tezClient.getLocalClient().getMockApp();
mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(allowScheduling);
mockAppLauncherGoFlag.notify();
}
}
void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag,
MockTezClient tezClient) throws Exception {
synchronized (mockAppLauncherGoFlag) {
while (!mockAppLauncherGoFlag.get()) {
mockAppLauncherGoFlag.wait();
}
mockApp = tezClient.getLocalClient().getMockApp();
mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(allowScheduling);
mockAppLauncherGoFlag.notify();
}
}
@Test
public void testTransformListenersConcurrentModification() throws InterruptedException {
// Create a base transform
final BaseTransform baseTransform =
new BaseTransform( mockHelper.transformMeta, mockHelper.iTransformMeta, mockHelper.iTransformData, 0, mockHelper.pipelineMeta, mockHelper.pipeline );
// Create thread to dynamically add listeners
final AtomicBoolean done = new AtomicBoolean( false );
Thread addListeners = new Thread() {
@Override
public void run() {
while ( !done.get() ) {
baseTransform.addTransformFinishedListener( mock( ITransformFinishedListener.class ) );
synchronized ( done ) {
done.notify();
}
}
}
};
// Mark start and stop while listeners are being added
try {
addListeners.start();
// Allow a few listeners to be added
synchronized ( done ) {
while ( baseTransform.getTransformFinishedListeners().size() < 20 ) {
done.wait();
}
}
baseTransform.markStart();
// Allow more listeners to be added
synchronized ( done ) {
while ( baseTransform.getTransformFinishedListeners().size() < 100 ) {
done.wait();
}
}
baseTransform.markStop();
} finally {
// Close addListeners thread
done.set( true );
addListeners.join();
}
}
@Test
public void testStepListenersConcurrentModification() throws InterruptedException {
// Create a base step
final BaseStep baseStep =
new BaseStep( mockHelper.stepMeta, mockHelper.stepDataInterface, 0, mockHelper.transMeta, mockHelper.trans );
// Create thread to dynamically add listeners
final AtomicBoolean done = new AtomicBoolean( false );
Thread addListeners = new Thread() {
@Override
public void run() {
while ( !done.get() ) {
baseStep.addStepListener( mock( StepListener.class ) );
synchronized ( done ) {
done.notify();
}
}
}
};
// Mark start and stop while listeners are being added
try {
addListeners.start();
// Allow a few listeners to be added
synchronized ( done ) {
while ( baseStep.getStepListeners().size() < 20 ) {
done.wait();
}
}
baseStep.markStart();
// Allow more listeners to be added
synchronized ( done ) {
while ( baseStep.getStepListeners().size() < 100 ) {
done.wait();
}
}
baseStep.markStop();
} finally {
// Close addListeners thread
done.set( true );
addListeners.join();
}
}