下面列出了com.google.common.util.concurrent.Uninterruptibles#joinUninterruptibly ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Stops and removes a volume scanner.<p/>
*
* This function will block until the volume scanner has stopped.
*
* @param volume The volume to remove.
*/
public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
if (!isEnabled()) {
LOG.debug("Not removing volume scanner for {}, because the block " +
"scanner is disabled.", volume.getStorageID());
return;
}
VolumeScanner scanner = scanners.get(volume.getStorageID());
if (scanner == null) {
LOG.warn("No scanner found to remove for volumeId {}",
volume.getStorageID());
return;
}
LOG.info("Removing scanner for volume {} (StorageID {})",
volume.getBasePath(), volume.getStorageID());
scanner.shutdown();
scanners.remove(volume.getStorageID());
Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
}
/**
* Close the DomainSocketWatcher and wait for its thread to terminate.
*
* If there is more than one close, all but the first will be ignored.
*/
@Override
public void close() throws IOException {
lock.lock();
try {
if (closed) return;
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": closing");
}
closed = true;
} finally {
lock.unlock();
}
// Close notificationSockets[0], so that notificationSockets[1] gets an EOF
// event. This will wake up the thread immediately if it is blocked inside
// the select() system call.
notificationSockets[0].close();
// Wait for the select thread to terminate.
Uninterruptibles.joinUninterruptibly(watcherThread);
}
/**
* Stops and removes a volume scanner.<p/>
*
* This function will block until the volume scanner has stopped.
*
* @param volume The volume to remove.
*/
public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
if (!isEnabled()) {
LOG.debug("Not removing volume scanner for {}, because the block " +
"scanner is disabled.", volume.getStorageID());
return;
}
VolumeScanner scanner = scanners.get(volume.getStorageID());
if (scanner == null) {
LOG.warn("No scanner found to remove for volumeId {}",
volume.getStorageID());
return;
}
LOG.info("Removing scanner for volume {} (StorageID {})",
volume.getBasePath(), volume.getStorageID());
scanner.shutdown();
scanners.remove(volume.getStorageID());
Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
}
/**
* Close the DomainSocketWatcher and wait for its thread to terminate.
*
* If there is more than one close, all but the first will be ignored.
*/
@Override
public void close() throws IOException {
lock.lock();
try {
if (closed) return;
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": closing");
}
closed = true;
} finally {
lock.unlock();
}
// Close notificationSockets[0], so that notificationSockets[1] gets an EOF
// event. This will wake up the thread immediately if it is blocked inside
// the select() system call.
notificationSockets[0].close();
// Wait for the select thread to terminate.
Uninterruptibles.joinUninterruptibly(watcherThread);
}
/**
* Test that transitioning a service to the state that it is already
* in is a nop, specifically, an exception is not thrown.
*/
@Test(timeout = 300000)
public void testTransitionToCurrentStateIsANop() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1L);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(1)
.build();
LinkedList<Thread> crmThreads = new LinkedList<Thread>();
try {
cluster.waitActive();
addCrmThreads(cluster, crmThreads);
cluster.transitionToActive(0);
addCrmThreads(cluster, crmThreads);
cluster.transitionToActive(0);
addCrmThreads(cluster, crmThreads);
cluster.transitionToStandby(0);
addCrmThreads(cluster, crmThreads);
cluster.transitionToStandby(0);
addCrmThreads(cluster, crmThreads);
} finally {
cluster.shutdown();
}
// Verify that all cacheReplicationMonitor threads shut down
for (Thread thread : crmThreads) {
Uninterruptibles.joinUninterruptibly(thread);
}
}
@Test(timeout=60000)
public void testStatisticsOperations() throws Exception {
final Statistics stats = new Statistics("file");
Assert.assertEquals(0L, stats.getBytesRead());
Assert.assertEquals(0L, stats.getBytesWritten());
Assert.assertEquals(0, stats.getWriteOps());
stats.incrementBytesWritten(1000);
Assert.assertEquals(1000L, stats.getBytesWritten());
Assert.assertEquals(0, stats.getWriteOps());
stats.incrementWriteOps(123);
Assert.assertEquals(123, stats.getWriteOps());
Thread thread = new Thread() {
@Override
public void run() {
stats.incrementWriteOps(1);
}
};
thread.start();
Uninterruptibles.joinUninterruptibly(thread);
Assert.assertEquals(124, stats.getWriteOps());
// Test copy constructor and reset function
Statistics stats2 = new Statistics(stats);
stats.reset();
Assert.assertEquals(0, stats.getWriteOps());
Assert.assertEquals(0L, stats.getBytesWritten());
Assert.assertEquals(0L, stats.getBytesRead());
Assert.assertEquals(124, stats2.getWriteOps());
Assert.assertEquals(1000L, stats2.getBytesWritten());
Assert.assertEquals(0L, stats2.getBytesRead());
}
/**
* Test that a java interruption can stop the watcher thread
*/
@Test(timeout=60000)
public void testInterruption() throws Exception {
final DomainSocketWatcher watcher = newDomainSocketWatcher(10);
watcher.watcherThread.interrupt();
Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
watcher.close();
}
/**
* Test that domain sockets are closed when the watcher is closed.
*/
@Test(timeout=300000)
public void testCloseSocketOnWatcherClose() throws Exception {
final DomainSocketWatcher watcher = newDomainSocketWatcher(10000000);
DomainSocket pair[] = DomainSocket.socketpair();
watcher.add(pair[1], new DomainSocketWatcher.Handler() {
@Override
public boolean handle(DomainSocket sock) {
return true;
}
});
watcher.close();
Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
assertFalse(pair[1].isOpen());
}
/**
* Test that transitioning a service to the state that it is already
* in is a nop, specifically, an exception is not thrown.
*/
@Test(timeout = 300000)
public void testTransitionToCurrentStateIsANop() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1L);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(1)
.build();
LinkedList<Thread> crmThreads = new LinkedList<Thread>();
try {
cluster.waitActive();
addCrmThreads(cluster, crmThreads);
cluster.transitionToActive(0);
addCrmThreads(cluster, crmThreads);
cluster.transitionToActive(0);
addCrmThreads(cluster, crmThreads);
cluster.transitionToStandby(0);
addCrmThreads(cluster, crmThreads);
cluster.transitionToStandby(0);
addCrmThreads(cluster, crmThreads);
} finally {
cluster.shutdown();
}
// Verify that all cacheReplicationMonitor threads shut down
for (Thread thread : crmThreads) {
Uninterruptibles.joinUninterruptibly(thread);
}
}
@Test(timeout=60000)
public void testStatisticsOperations() throws Exception {
final Statistics stats = new Statistics("file");
Assert.assertEquals(0L, stats.getBytesRead());
Assert.assertEquals(0L, stats.getBytesWritten());
Assert.assertEquals(0, stats.getWriteOps());
stats.incrementBytesWritten(1000);
Assert.assertEquals(1000L, stats.getBytesWritten());
Assert.assertEquals(0, stats.getWriteOps());
stats.incrementWriteOps(123);
Assert.assertEquals(123, stats.getWriteOps());
Thread thread = new Thread() {
@Override
public void run() {
stats.incrementWriteOps(1);
}
};
thread.start();
Uninterruptibles.joinUninterruptibly(thread);
Assert.assertEquals(124, stats.getWriteOps());
// Test copy constructor and reset function
Statistics stats2 = new Statistics(stats);
stats.reset();
Assert.assertEquals(0, stats.getWriteOps());
Assert.assertEquals(0L, stats.getBytesWritten());
Assert.assertEquals(0L, stats.getBytesRead());
Assert.assertEquals(124, stats2.getWriteOps());
Assert.assertEquals(1000L, stats2.getBytesWritten());
Assert.assertEquals(0L, stats2.getBytesRead());
}
/**
* Test that a java interruption can stop the watcher thread
*/
@Test(timeout=60000)
public void testInterruption() throws Exception {
final DomainSocketWatcher watcher = newDomainSocketWatcher(10);
watcher.watcherThread.interrupt();
Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
watcher.close();
}
/**
* Test that domain sockets are closed when the watcher is closed.
*/
@Test(timeout=300000)
public void testCloseSocketOnWatcherClose() throws Exception {
final DomainSocketWatcher watcher = newDomainSocketWatcher(10000000);
DomainSocket pair[] = DomainSocket.socketpair();
watcher.add(pair[1], new DomainSocketWatcher.Handler() {
@Override
public boolean handle(DomainSocket sock) {
return true;
}
});
watcher.close();
Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
assertFalse(pair[1].isOpen());
}
@Override
public void close() throws IOException {
watcher.stopPumping();
try {
// The watcher thread might leak if the following call is interrupted.
// This is a relatively minor issue since the worst it could do is
// write one additional line from the test.log to the console later on
// in the build.
watcher.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
watcher.interrupt();
Uninterruptibles.joinUninterruptibly(
watcher, JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS, TimeUnit.SECONDS);
Preconditions.checkState(
!watcher.isAlive(),
"Watcher thread failed to exit for %s seconds after interrupt",
JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS);
}
// It's unclear if writing this after interrupt is desirable, but it's been this way forever.
if (!headerFilter.foundHeader()) {
try (InputStream input = testLogPath.getInputStream()) {
ByteStreams.copy(input, outErr.getOutputStream());
}
}
}
/**
* Stop the update thread and wait for it to terminate. As the update thread, which is a separate
* thread, might have to call a synchronized method between being interrupted and terminating, DO
* NOT CALL from a SYNCHRONIZED block, as this will give the opportunity for dead locks.
*/
private void stopUpdateThread() {
shutdown = true;
Thread threadToWaitFor = updateThread.getAndSet(null);
if (threadToWaitFor != null) {
threadToWaitFor.interrupt();
Uninterruptibles.joinUninterruptibly(threadToWaitFor);
}
}
/**
* Stop the consuming thread.
*/
public void stop()
throws InterruptedException {
_shouldStop = true;
// This method could be called either when we get an ONLINE transition or
// when we commit a segment and replace the realtime segment with a committed
// one. In the latter case, we don't want to call join.
if (Thread.currentThread() != _consumerThread) {
Uninterruptibles.joinUninterruptibly(_consumerThread, 10, TimeUnit.MINUTES);
if (_consumerThread.isAlive()) {
segmentLogger.warn("Failed to stop consumer thread within 10 minutes");
}
}
}
/**
* Concurrent read from the same node and verify the contents.
*/
@Test
public void testConcurrentRead()
throws Exception {
startUpCluster(true, 2);
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".dat");
final int SEED = 0xFADED;
final int NUM_TASKS = 5;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
//Read from multiple clients
final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
final AtomicBoolean testFailed = new AtomicBoolean(false);
Runnable readerRunnable = new Runnable() {
@Override
public void run() {
try {
Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
} catch (Throwable e) {
LOG.error("readerRunnable error", e);
testFailed.set(true);
} finally {
latch.countDown();
}
}
};
Thread threads[] = new Thread[NUM_TASKS];
for (int i = 0; i < NUM_TASKS; i++) {
threads[i] = new Thread(readerRunnable);
threads[i].start();
}
Thread.sleep(500);
for (int i = 0; i < NUM_TASKS; i++) {
Uninterruptibles.joinUninterruptibly(threads[i]);
}
Assert.assertFalse(testFailed.get());
}
/**
* Test the case where we have multiple threads waiting on the
* ShortCircuitCache delivering a certain ShortCircuitReplica.
*
* In this case, there should only be one call to
* createShortCircuitReplicaInfo. This one replica should be shared
* by all threads.
*/
@Test(timeout=60000)
public void testMultipleWaitersOnShortCircuitCache()
throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
final AtomicBoolean testFailed = new AtomicBoolean(false);
DFSInputStream.tcpReadsDisabledForTesting = true;
BlockReaderFactory.createShortCircuitReplicaInfoCallback =
new ShortCircuitCache.ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Uninterruptibles.awaitUninterruptibly(latch);
if (!creationIsBlocked.compareAndSet(true, false)) {
Assert.fail("there were multiple calls to "
+ "createShortCircuitReplicaInfo. Only one was expected.");
}
return null;
}
};
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testMultipleWaitersOnShortCircuitCache", sockDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String TEST_FILE = "/test_file";
final int TEST_FILE_LEN = 4000;
final int SEED = 0xFADED;
final int NUM_THREADS = 10;
DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
(short)1, SEED);
Runnable readerRunnable = new Runnable() {
@Override
public void run() {
try {
byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
Assert.assertFalse(creationIsBlocked.get());
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
} catch (Throwable e) {
LOG.error("readerRunnable error", e);
testFailed.set(true);
}
}
};
Thread threads[] = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread(readerRunnable);
threads[i].start();
}
Thread.sleep(500);
latch.countDown();
for (int i = 0; i < NUM_THREADS; i++) {
Uninterruptibles.joinUninterruptibly(threads[i]);
}
cluster.shutdown();
sockDir.close();
Assert.assertFalse(testFailed.get());
}
/**
* Concurrent read from the same node and verify the contents.
*/
@Test
public void testConcurrentRead()
throws Exception {
startUpCluster(true, 2);
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".dat");
final int SEED = 0xFADED;
final int NUM_TASKS = 5;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
//Read from multiple clients
final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
final AtomicBoolean testFailed = new AtomicBoolean(false);
Runnable readerRunnable = new Runnable() {
@Override
public void run() {
try {
Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
} catch (Throwable e) {
LOG.error("readerRunnable error", e);
testFailed.set(true);
} finally {
latch.countDown();
}
}
};
Thread threads[] = new Thread[NUM_TASKS];
for (int i = 0; i < NUM_TASKS; i++) {
threads[i] = new Thread(readerRunnable);
threads[i].start();
}
Thread.sleep(500);
for (int i = 0; i < NUM_TASKS; i++) {
Uninterruptibles.joinUninterruptibly(threads[i]);
}
Assert.assertFalse(testFailed.get());
}
/**
* Test the case where we have multiple threads waiting on the
* ShortCircuitCache delivering a certain ShortCircuitReplica.
*
* In this case, there should only be one call to
* createShortCircuitReplicaInfo. This one replica should be shared
* by all threads.
*/
@Test(timeout=60000)
public void testMultipleWaitersOnShortCircuitCache()
throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
final AtomicBoolean testFailed = new AtomicBoolean(false);
DFSInputStream.tcpReadsDisabledForTesting = true;
BlockReaderFactory.createShortCircuitReplicaInfoCallback =
new ShortCircuitCache.ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Uninterruptibles.awaitUninterruptibly(latch);
if (!creationIsBlocked.compareAndSet(true, false)) {
Assert.fail("there were multiple calls to "
+ "createShortCircuitReplicaInfo. Only one was expected.");
}
return null;
}
};
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testMultipleWaitersOnShortCircuitCache", sockDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String TEST_FILE = "/test_file";
final int TEST_FILE_LEN = 4000;
final int SEED = 0xFADED;
final int NUM_THREADS = 10;
DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
(short)1, SEED);
Runnable readerRunnable = new Runnable() {
@Override
public void run() {
try {
byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
Assert.assertFalse(creationIsBlocked.get());
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
} catch (Throwable e) {
LOG.error("readerRunnable error", e);
testFailed.set(true);
}
}
};
Thread threads[] = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread(readerRunnable);
threads[i].start();
}
Thread.sleep(500);
latch.countDown();
for (int i = 0; i < NUM_THREADS; i++) {
Uninterruptibles.joinUninterruptibly(threads[i]);
}
cluster.shutdown();
sockDir.close();
Assert.assertFalse(testFailed.get());
}