下面列出了org.apache.hadoop.io.MultipleIOException#org.apache.hadoop.hdfs.server.datanode.StorageLocation 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Gets initial volume failure information for all volumes that failed
* immediately at startup. The method works by determining the set difference
* between all configured storage locations and the actual storage locations in
* use after attempting to put all of them into service.
*
* @return each storage location that has failed
*/
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
Collection<StorageLocation> dataLocations, DataStorage storage) {
Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
dataLocations.size());
for (StorageLocation sl: dataLocations) {
failedLocationSet.add(sl.getFile().getAbsolutePath());
}
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory sd = it.next();
failedLocationSet.remove(sd.getRoot().getAbsolutePath());
}
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
failedLocationSet.size());
long failureDate = Time.now();
for (String failedStorageLocation: failedLocationSet) {
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
failureDate));
}
return volumeFailureInfos;
}
/**
* Gets initial volume failure information for all volumes that failed
* immediately at startup. The method works by determining the set difference
* between all configured storage locations and the actual storage locations in
* use after attempting to put all of them into service.
*
* @return each storage location that has failed
*/
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
Collection<StorageLocation> dataLocations, DataStorage storage) {
Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
dataLocations.size());
for (StorageLocation sl: dataLocations) {
failedLocationSet.add(sl.getFile().getAbsolutePath());
}
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory sd = it.next();
failedLocationSet.remove(sd.getRoot().getAbsolutePath());
}
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
failedLocationSet.size());
long failureDate = Time.now();
for (String failedStorageLocation: failedLocationSet) {
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
failureDate));
}
return volumeFailureInfos;
}
@Before
public void setupPaths() throws IOException {
containerSet = new ContainerSet();
volumeSet = new MutableVolumeSet(DATANODE_UUID, conf);
blockManager = new BlockManagerImpl(conf);
chunkManager = ChunkManagerFactory.createChunkManager(conf, blockManager);
for (String dir : conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY)) {
StorageLocation location = StorageLocation.parse(dir);
FileUtils.forceMkdir(new File(location.getNormalizedUri()));
}
}
@After
public void cleanupDir() throws IOException {
// Clean up SCM metadata
log.info("Deleting {}", hddsPath);
FileUtils.deleteDirectory(new File(hddsPath));
// Clean up SCM datanode container metadata/data
for (String dir : conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY)) {
StorageLocation location = StorageLocation.parse(dir);
FileUtils.deleteDirectory(new File(location.getNormalizedUri()));
}
}
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
final File dir = sd.getCurrentDir();
final StorageType storageType =
getStorageTypeFromLocations(dataLocations, sd.getRoot());
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
synchronized (this) {
volumeMap.addAll(tempVolumeMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
volumes.addVolume(ref);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) {
if (dataLocation.getFile().equals(dir)) {
return dataLocation.getStorageType();
}
}
return StorageType.DEFAULT;
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
final int numExistingVolumes = dataset.getVolumes().size();
final int totalVolumes = numNewVolumes + numExistingVolumes;
Set<String> expectedVolumes = new HashSet<String>();
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
String pathUri = new Path(path).toUri().toString();
expectedVolumes.add(new File(pathUri).toString());
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
}
assertEquals(totalVolumes, dataset.getVolumes().size());
assertEquals(totalVolumes, dataset.storageMap.size());
Set<String> actualVolumes = new HashSet<String>();
for (int i = 0; i < numNewVolumes; i++) {
actualVolumes.add(
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
}
assertEquals(actualVolumes.size(), expectedVolumes.size());
assertTrue(actualVolumes.containsAll(expectedVolumes));
}
@Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = dataset.getVolumes().size();
List<NamespaceInfo> nsInfos = new ArrayList<>();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater";
StorageLocation loc = StorageLocation.parse(newVolumePath);
Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
assertEquals(numExistingVolumes + 1, dataset.getVolumes().size());
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
Set<File> volumesToRemove = new HashSet<>();
volumesToRemove.add(loc.getFile());
dataset.removeVolumes(volumesToRemove, true);
assertEquals(numExistingVolumes, dataset.getVolumes().size());
}
@Test
public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(File.class), any(StorageType.class));
doThrow(new IOException("Failed to getVolumeMap()"))
.when(mockVolume).getVolumeMap(
anyString(),
any(ReplicaMap.class),
any(RamDiskReplicaLruTracker.class));
Storage.StorageDirectory sd = createStorageDirectory(badDir);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(badDir.getAbsoluteFile()),
Matchers.<List<NamespaceInfo>>any()))
.thenReturn(builder);
StorageLocation location = StorageLocation.parse(badDir.toString());
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
try {
spyDataset.addVolume(location, nsInfos);
fail("Expect to throw MultipleIOException");
} catch (MultipleIOException e) {
}
FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
}
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
final File dir = sd.getCurrentDir();
final StorageType storageType =
getStorageTypeFromLocations(dataLocations, sd.getRoot());
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
synchronized (this) {
volumeMap.addAll(tempVolumeMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
volumes.addVolume(ref);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) {
if (dataLocation.getFile().equals(dir)) {
return dataLocation.getStorageType();
}
}
return StorageType.DEFAULT;
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
final int numExistingVolumes = dataset.getVolumes().size();
final int totalVolumes = numNewVolumes + numExistingVolumes;
Set<String> expectedVolumes = new HashSet<String>();
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
String pathUri = new Path(path).toUri().toString();
expectedVolumes.add(new File(pathUri).toString());
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
}
assertEquals(totalVolumes, dataset.getVolumes().size());
assertEquals(totalVolumes, dataset.storageMap.size());
Set<String> actualVolumes = new HashSet<String>();
for (int i = 0; i < numNewVolumes; i++) {
actualVolumes.add(
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
}
assertEquals(actualVolumes.size(), expectedVolumes.size());
assertTrue(actualVolumes.containsAll(expectedVolumes));
}
@Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = dataset.getVolumes().size();
List<NamespaceInfo> nsInfos = new ArrayList<>();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater";
StorageLocation loc = StorageLocation.parse(newVolumePath);
Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
assertEquals(numExistingVolumes + 1, dataset.getVolumes().size());
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
Set<File> volumesToRemove = new HashSet<>();
volumesToRemove.add(loc.getFile());
dataset.removeVolumes(volumesToRemove, true);
assertEquals(numExistingVolumes, dataset.getVolumes().size());
}
@Test
public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(File.class), any(StorageType.class));
doThrow(new IOException("Failed to getVolumeMap()"))
.when(mockVolume).getVolumeMap(
anyString(),
any(ReplicaMap.class),
any(RamDiskReplicaLruTracker.class));
Storage.StorageDirectory sd = createStorageDirectory(badDir);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(badDir.getAbsoluteFile()),
Matchers.<List<NamespaceInfo>>any()))
.thenReturn(builder);
StorageLocation location = StorageLocation.parse(badDir.toString());
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
try {
spyDataset.addVolume(location, nsInfos);
fail("Expect to throw MultipleIOException");
} catch (MultipleIOException e) {
}
FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
}
/**
* Add DN volumes configured through ConfigKeys to volumeMap.
*/
private void initializeVolumeSet() throws IOException {
volumeMap = new ConcurrentHashMap<>();
failedVolumeMap = new ConcurrentHashMap<>();
volumeStateMap = new EnumMap<>(StorageType.class);
Collection<String> rawLocations = getDatanodeStorageDirs(conf);
for (StorageType storageType : StorageType.values()) {
volumeStateMap.put(storageType, new ArrayList<>());
}
for (String locationString : rawLocations) {
try {
StorageLocation location = StorageLocation.parse(locationString);
HddsVolume hddsVolume = createVolume(location.getUri().getPath(),
location.getStorageType());
checkAndSetClusterID(hddsVolume.getClusterID());
LOG.info("Added Volume : {} to VolumeSet",
hddsVolume.getHddsRootDir().getPath());
if (!hddsVolume.getHddsRootDir().mkdirs() &&
!hddsVolume.getHddsRootDir().exists()) {
throw new IOException("Failed to create HDDS storage dir " +
hddsVolume.getHddsRootDir());
}
volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
} catch (IOException e) {
HddsVolume volume = new HddsVolume.Builder(locationString)
.failedVolume(true).build();
failedVolumeMap.put(locationString, volume);
LOG.error("Failed to parse the storage location: " + locationString, e);
}
}
// First checking if we have any volumes, if all volumes are failed the
// volumeMap size will be zero, and we throw Exception.
if (volumeMap.size() == 0) {
throw new DiskOutOfSpaceException("No storage locations configured");
}
checkAllVolumes();
// Ensure volume threads are stopped and scm df is saved during shutdown.
shutdownHook = () -> {
saveVolumeSetUsed();
};
ShutdownHookManager.get().addShutdownHook(shutdownHook,
SHUTDOWN_HOOK_PRIORITY);
}
@Test
public void testRandomChoosingPolicy() throws IOException {
File containerDir = new File(path);
if (containerDir.exists()) {
FileUtils.deleteDirectory(new File(path));
}
Assert.assertTrue(containerDir.mkdirs());
conf.set(
ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
RandomContainerDeletionChoosingPolicy.class.getName());
List<StorageLocation> pathLists = new LinkedList<>();
pathLists.add(StorageLocation.parse(containerDir.getAbsolutePath()));
containerSet = new ContainerSet();
int numContainers = 10;
for (int i = 0; i < numContainers; i++) {
KeyValueContainerData data = new KeyValueContainerData(i,
layout,
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
data.closeContainer();
KeyValueContainer container = new KeyValueContainer(data, conf);
containerSet.addContainer(container);
Assert.assertTrue(
containerSet.getContainerMapCopy()
.containsKey(data.getContainerID()));
}
blockDeletingService = getBlockDeletingService();
ContainerDeletionChoosingPolicy deletionPolicy =
new RandomContainerDeletionChoosingPolicy();
List<ContainerData> result0 =
blockDeletingService.chooseContainerForBlockDeletion(5, deletionPolicy);
Assert.assertEquals(5, result0.size());
// test random choosing
List<ContainerData> result1 = blockDeletingService
.chooseContainerForBlockDeletion(numContainers, deletionPolicy);
List<ContainerData> result2 = blockDeletingService
.chooseContainerForBlockDeletion(numContainers, deletionPolicy);
boolean hasShuffled = false;
for (int i = 0; i < numContainers; i++) {
if (result1.get(i).getContainerID()
!= result2.get(i).getContainerID()) {
hasShuffled = true;
break;
}
}
Assert.assertTrue("Chosen container results were same", hasShuffled);
}
/**
* An FSDataset has a directory where it loads its data files.
*/
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
this.fsRunning = true;
this.datanode = datanode;
this.dataStorage = storage;
this.conf = conf;
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
dataLocations, storage);
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
int volsFailed = volumeFailureInfos.size();
this.validVolsRequired = volsConfigured - volFailuresTolerated;
if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
throw new DiskErrorException("Invalid volume failure "
+ " config value: " + volFailuresTolerated);
}
if (volsFailed > volFailuresTolerated) {
throw new DiskErrorException("Too many failed volumes - "
+ "current valid volumes: " + storage.getNumStorageDirs()
+ ", volumes configured: " + volsConfigured
+ ", volumes failed: " + volsFailed
+ ", volume failures tolerated: " + volFailuresTolerated);
}
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
deletingBlock = new HashMap<String, Set<Long>>();
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx));
}
setupAsyncLazyPersistThreads();
cacheManager = new FsDatasetCache(this);
// Start the lazy writer once we have built the replica maps.
lazyWriter = new Daemon(new LazyWriter(conf));
lazyWriter.start();
registerMBean(datanode.getDatanodeUuid());
localFS = FileSystem.getLocal(conf);
blockPinningEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
}
@Test(timeout = 30000)
public void testGetReconfigureStatus()
throws IOException, InterruptedException {
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
datanode.setReconfigurationUtil(ru);
List<ReconfigurationUtil.PropertyChange> changes =
new ArrayList<ReconfigurationUtil.PropertyChange>();
File newDir = new File(cluster.getDataDirectory(), "data_new");
newDir.mkdirs();
changes.add(new ReconfigurationUtil.PropertyChange(
DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
changes.add(new ReconfigurationUtil.PropertyChange(
"randomKey", "new123", "old456"));
when(ru.parseChangedProperties(any(Configuration.class),
any(Configuration.class))).thenReturn(changes);
final int port = datanode.getIpcPort();
final String address = "localhost:" + port;
assertThat(admin.startReconfiguration("datanode", address), is(0));
List<String> outputs = null;
int count = 100;
while (count > 0) {
outputs = getReconfigureStatus("datanode", address);
if (!outputs.isEmpty() && outputs.get(0).contains("finished")) {
break;
}
count--;
Thread.sleep(100);
}
assertTrue(count > 0);
assertThat(outputs.size(), is(8)); // 3 (SUCCESS) + 4 (FAILED)
List<StorageLocation> locations = DataNode.getStorageLocations(
datanode.getConf());
assertThat(locations.size(), is(1));
assertThat(locations.get(0).getFile(), is(newDir));
// Verify the directory is appropriately formatted.
assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5;
int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4;
assertThat(outputs.get(successOffset),
containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY));
assertThat(outputs.get(successOffset + 1),
is(allOf(containsString("From:"), containsString("data1"),
containsString("data2"))));
assertThat(outputs.get(successOffset + 2),
is(not(anyOf(containsString("data1"), containsString("data2")))));
assertThat(outputs.get(successOffset + 2),
is(allOf(containsString("To"), containsString("data_new"))));
assertThat(outputs.get(failedOffset),
containsString("Change property randomKey"));
assertThat(outputs.get(failedOffset + 1),
containsString("From: \"old456\""));
assertThat(outputs.get(failedOffset + 2),
containsString("To: \"new123\""));
}
/**
* An FSDataset has a directory where it loads its data files.
*/
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
this.fsRunning = true;
this.datanode = datanode;
this.dataStorage = storage;
this.conf = conf;
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
dataLocations, storage);
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
int volsFailed = volumeFailureInfos.size();
this.validVolsRequired = volsConfigured - volFailuresTolerated;
if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
throw new DiskErrorException("Invalid volume failure "
+ " config value: " + volFailuresTolerated);
}
if (volsFailed > volFailuresTolerated) {
throw new DiskErrorException("Too many failed volumes - "
+ "current valid volumes: " + storage.getNumStorageDirs()
+ ", volumes configured: " + volsConfigured
+ ", volumes failed: " + volsFailed
+ ", volume failures tolerated: " + volFailuresTolerated);
}
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
deletingBlock = new HashMap<String, Set<Long>>();
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx));
}
setupAsyncLazyPersistThreads();
cacheManager = new FsDatasetCache(this);
// Start the lazy writer once we have built the replica maps.
lazyWriter = new Daemon(new LazyWriter(conf));
lazyWriter.start();
registerMBean(datanode.getDatanodeUuid());
localFS = FileSystem.getLocal(conf);
blockPinningEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
}
@Test(timeout = 30000)
public void testGetReconfigureStatus()
throws IOException, InterruptedException {
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
datanode.setReconfigurationUtil(ru);
List<ReconfigurationUtil.PropertyChange> changes =
new ArrayList<ReconfigurationUtil.PropertyChange>();
File newDir = new File(cluster.getDataDirectory(), "data_new");
newDir.mkdirs();
changes.add(new ReconfigurationUtil.PropertyChange(
DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
changes.add(new ReconfigurationUtil.PropertyChange(
"randomKey", "new123", "old456"));
when(ru.parseChangedProperties(any(Configuration.class),
any(Configuration.class))).thenReturn(changes);
final int port = datanode.getIpcPort();
final String address = "localhost:" + port;
assertThat(admin.startReconfiguration("datanode", address), is(0));
List<String> outputs = null;
int count = 100;
while (count > 0) {
outputs = getReconfigureStatus("datanode", address);
if (!outputs.isEmpty() && outputs.get(0).contains("finished")) {
break;
}
count--;
Thread.sleep(100);
}
assertTrue(count > 0);
assertThat(outputs.size(), is(8)); // 3 (SUCCESS) + 4 (FAILED)
List<StorageLocation> locations = DataNode.getStorageLocations(
datanode.getConf());
assertThat(locations.size(), is(1));
assertThat(locations.get(0).getFile(), is(newDir));
// Verify the directory is appropriately formatted.
assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5;
int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4;
assertThat(outputs.get(successOffset),
containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY));
assertThat(outputs.get(successOffset + 1),
is(allOf(containsString("From:"), containsString("data1"),
containsString("data2"))));
assertThat(outputs.get(successOffset + 2),
is(not(anyOf(containsString("data1"), containsString("data2")))));
assertThat(outputs.get(successOffset + 2),
is(allOf(containsString("To"), containsString("data_new"))));
assertThat(outputs.get(failedOffset),
containsString("Change property randomKey"));
assertThat(outputs.get(failedOffset + 1),
containsString("From: \"old456\""));
assertThat(outputs.get(failedOffset + 2),
containsString("To: \"new123\""));
}
/**
* Add a new volume to the FsDataset.<p/>
*
* If the FSDataset supports block scanning, this function registers
* the new volume with the block scanner.
*
* @param location The storage location for the new volume.
* @param nsInfos Namespace information for the new volume.
*/
public void addVolume(
final StorageLocation location,
final List<NamespaceInfo> nsInfos) throws IOException;
/**
* Add a new volume to the FsDataset.<p/>
*
* If the FSDataset supports block scanning, this function registers
* the new volume with the block scanner.
*
* @param location The storage location for the new volume.
* @param nsInfos Namespace information for the new volume.
*/
public void addVolume(
final StorageLocation location,
final List<NamespaceInfo> nsInfos) throws IOException;