下面列出了org.apache.hadoop.io.MultipleIOException#org.apache.hadoop.hdfs.server.common.Storage 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
Storage dstStorage, boolean needDigest) throws IOException {
String fileid = ImageServlet.getParamStringForImage(null,
imageTxId, dstStorage);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(
NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
MD5Hash hash = getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
dstFiles.get(0).length() + " bytes.");
return hash;
}
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
long imageTxId, Storage dstStorage, InputStream stream,
long advertisedSize, DataTransferThrottler throttler) throws IOException {
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
MD5Hash advertisedDigest = parseMD5Header(request);
MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
advertisedSize, advertisedDigest, fileName, stream, throttler);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+ dstFiles.get(0).length() + " bytes.");
return hash;
}
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
JournalInfo journalInfo) // active name-node
throws IOException {
super();
this.bnRegistration = bnReg;
this.journalInfo = journalInfo;
InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress());
try {
this.backupNode = NameNodeProxies.createNonHAProxy(new HdfsConfiguration(),
bnAddress, JournalProtocol.class, UserGroupInformation.getCurrentUser(),
true).getProxy();
} catch(IOException e) {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
}
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
/**
* Read the header of fsedit log
* @param in fsedit stream
* @return the edit log version number
* @throws IOException if error occurs
*/
@VisibleForTesting
static int readLogVersion(DataInputStream in, boolean verifyLayoutVersion)
throws IOException, LogHeaderCorruptException {
int logVersion;
try {
logVersion = in.readInt();
} catch (EOFException eofe) {
throw new LogHeaderCorruptException(
"Reached EOF when reading log header");
}
if (verifyLayoutVersion &&
(logVersion < HdfsConstants.NAMENODE_LAYOUT_VERSION || // future version
logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION)) { // unsupported
throw new LogHeaderCorruptException(
"Unexpected version of the file system log file: "
+ logVersion + ". Current version = "
+ HdfsConstants.NAMENODE_LAYOUT_VERSION + ".");
}
return logVersion;
}
/**
* Gets initial volume failure information for all volumes that failed
* immediately at startup. The method works by determining the set difference
* between all configured storage locations and the actual storage locations in
* use after attempting to put all of them into service.
*
* @return each storage location that has failed
*/
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
Collection<StorageLocation> dataLocations, DataStorage storage) {
Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
dataLocations.size());
for (StorageLocation sl: dataLocations) {
failedLocationSet.add(sl.getFile().getAbsolutePath());
}
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory sd = it.next();
failedLocationSet.remove(sd.getRoot().getAbsolutePath());
}
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
failedLocationSet.size());
long failureDate = Time.now();
for (String failedStorageLocation: failedLocationSet) {
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
failureDate));
}
return volumeFailureInfos;
}
/**
* Test conversion from a block file path to its target trash
* directory.
*/
public void getTrashDirectoryForBlockFile(String fileName, int nestingLevel) {
final String blockFileSubdir = makeRandomBlockFileSubdir(nestingLevel);
final String blockFileName = fileName;
String testFilePath =
storage.getSingularStorageDir().getRoot() + File.separator +
Storage.STORAGE_DIR_CURRENT +
blockFileSubdir + blockFileName;
String expectedTrashPath =
storage.getSingularStorageDir().getRoot() + File.separator +
BlockPoolSliceStorage.TRASH_ROOT_DIR +
blockFileSubdir.substring(0, blockFileSubdir.length() - 1);
LOG.info("Got subdir " + blockFileSubdir);
LOG.info("Generated file path " + testFilePath);
assertThat(storage.getTrashDirectory(new File(testFilePath)), is(expectedTrashPath));
}
/**
* For block pool, verify that the current and/or previous exist as indicated
* by the method parameters. If previous exists, verify that
* it hasn't been modified by comparing the checksum of all it's
* containing files with their original checksum. It is assumed that
* the server has recovered.
* @param baseDirs directories pointing to block pool storage
* @param bpid block pool Id
* @param currentShouldExist current directory exists under storage
* @param currentShouldExist previous directory exists under storage
*/
void checkResultBlockPool(String[] baseDirs, boolean currentShouldExist,
boolean previousShouldExist) throws IOException
{
if (currentShouldExist) {
for (int i = 0; i < baseDirs.length; i++) {
File bpCurDir = new File(baseDirs[i], Storage.STORAGE_DIR_CURRENT);
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurDir,
false), UpgradeUtilities.checksumMasterBlockPoolContents());
}
}
if (previousShouldExist) {
for (int i = 0; i < baseDirs.length; i++) {
File bpPrevDir = new File(baseDirs[i], Storage.STORAGE_DIR_PREVIOUS);
assertTrue(bpPrevDir.isDirectory());
assertEquals(
UpgradeUtilities.checksumContents(DATA_NODE, bpPrevDir,
false), UpgradeUtilities.checksumMasterBlockPoolContents());
}
}
}
/**
* Simulate the {@link DFSConfigKeys#DFS_DATANODE_DATA_DIR_KEY} of a
* populated DFS filesystem.
* This method populates for each parent directory, <code>parent/dirName</code>
* with the content of block pool storage directory that comes from a singleton
* datanode master (that contains version and block files). If the destination
* directory does not exist, it will be created. If the directory already
* exists, it will first be deleted.
*
* @param parents parent directory where {@code dirName} is created
* @param dirName directory under which storage directory is created
* @param bpid block pool id for which the storage directory is created.
* @return the array of created directories
*/
public static File[] createBlockPoolStorageDirs(String[] parents,
String dirName, String bpid) throws Exception {
File[] retVal = new File[parents.length];
Path bpCurDir = new Path(MiniDFSCluster.getBPDir(datanodeStorage,
bpid, Storage.STORAGE_DIR_CURRENT));
for (int i = 0; i < parents.length; i++) {
File newDir = new File(parents[i] + "/current/" + bpid, dirName);
createEmptyDirs(new String[] {newDir.toString()});
LocalFileSystem localFS = FileSystem.getLocal(new HdfsConfiguration());
localFS.copyToLocalFile(bpCurDir,
new Path(newDir.toString()),
false);
retVal[i] = newDir;
}
return retVal;
}
/**
* Make sure that an HA NN will start if a previous upgrade was in progress.
*/
@Test
public void testStartingWithUpgradeInProgressSucceeds() throws Exception {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
// Simulate an upgrade having started.
for (int i = 0; i < 2; i++) {
for (URI uri : cluster.getNameDirs(i)) {
File prevTmp = new File(new File(uri), Storage.STORAGE_TMP_PREVIOUS);
LOG.info("creating previous tmp dir: " + prevTmp);
assertTrue(prevTmp.mkdirs());
}
}
cluster.restartNameNodes();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Corrupts the MD5 sum of the fsimage.
*
* @param corruptAll
* whether to corrupt one or all of the MD5 sums in the configured
* namedirs
* @throws IOException
*/
private void corruptFSImageMD5(boolean corruptAll) throws IOException {
List<URI> nameDirs = (List<URI>)FSNamesystem.getNamespaceDirs(config);
// Corrupt the md5 files in all the namedirs
for (URI uri: nameDirs) {
// Directory layout looks like:
// test/data/dfs/nameN/current/{fsimage,edits,...}
File nameDir = new File(uri.getPath());
File dfsDir = nameDir.getParentFile();
assertEquals(dfsDir.getName(), "dfs"); // make sure we got right dir
// Set the md5 file to all zeros
File imageFile = new File(nameDir,
Storage.STORAGE_DIR_CURRENT + "/"
+ NNStorage.getImageFileName(0));
MD5FileUtils.saveMD5File(imageFile, new MD5Hash(new byte[16]));
// Only need to corrupt one if !corruptAll
if (!corruptAll) {
break;
}
}
}
private static void createStorageDirs(DataStorage storage, Configuration conf,
int numDirs) throws IOException {
List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>();
for (int i = 0; i < numDirs; i++) {
File loc = new File(BASE_DIR + "/data" + i);
dirStrings.add(new Path(loc.toString()).toUri().toString());
loc.mkdirs();
dirs.add(createStorageDirectory(loc));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
String dataDir = StringUtils.join(",", dirStrings);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
when(storage.dirIterator()).thenReturn(dirs.iterator());
when(storage.getNumStorageDirs()).thenReturn(numDirs);
}
/**
* Test conversion from a block file path to its target trash
* directory.
*/
public void getTrashDirectoryForBlockFile(String fileName, int nestingLevel) {
final String blockFileSubdir = makeRandomBlockFileSubdir(nestingLevel);
final String blockFileName = fileName;
String testFilePath =
storage.getSingularStorageDir().getRoot() + File.separator +
Storage.STORAGE_DIR_CURRENT +
blockFileSubdir + blockFileName;
String expectedTrashPath =
storage.getSingularStorageDir().getRoot() + File.separator +
BlockPoolSliceStorage.TRASH_ROOT_DIR +
blockFileSubdir.substring(0, blockFileSubdir.length() - 1);
LOG.info("Got subdir " + blockFileSubdir);
LOG.info("Generated file path " + testFilePath);
assertThat(storage.getTrashDirectory(new File(testFilePath)), is(expectedTrashPath));
}
public void getRestoreDirectoryForBlockFile(String fileName, int nestingLevel) {
BlockPoolSliceStorage storage = makeBlockPoolStorage();
final String blockFileSubdir = makeRandomBlockFileSubdir(nestingLevel);
final String blockFileName = fileName;
String deletedFilePath =
storage.getSingularStorageDir().getRoot() + File.separator +
BlockPoolSliceStorage.TRASH_ROOT_DIR +
blockFileSubdir + blockFileName;
String expectedRestorePath =
storage.getSingularStorageDir().getRoot() + File.separator +
Storage.STORAGE_DIR_CURRENT +
blockFileSubdir.substring(0, blockFileSubdir.length() - 1);
LOG.info("Generated deleted file path " + deletedFilePath);
assertThat(storage.getRestoreDirectory(new File(deletedFilePath)),
is(expectedRestorePath));
}
public void getRestoreDirectoryForBlockFile(String fileName, int nestingLevel) {
BlockPoolSliceStorage storage = makeBlockPoolStorage();
final String blockFileSubdir = makeRandomBlockFileSubdir(nestingLevel);
final String blockFileName = fileName;
String deletedFilePath =
storage.getSingularStorageDir().getRoot() + File.separator +
BlockPoolSliceStorage.TRASH_ROOT_DIR +
blockFileSubdir + blockFileName;
String expectedRestorePath =
storage.getSingularStorageDir().getRoot() + File.separator +
Storage.STORAGE_DIR_CURRENT +
blockFileSubdir.substring(0, blockFileSubdir.length() - 1);
LOG.info("Generated deleted file path " + deletedFilePath);
assertThat(storage.getRestoreDirectory(new File(deletedFilePath)),
is(expectedRestorePath));
}
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
Storage dstStorage, boolean needDigest) throws IOException {
String fileid = ImageServlet.getParamStringForImage(null,
imageTxId, dstStorage);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(
NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
MD5Hash hash = getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
dstFiles.get(0).length() + " bytes.");
return hash;
}
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
JournalInfo journalInfo) // active name-node
throws IOException {
super();
this.bnRegistration = bnReg;
this.journalInfo = journalInfo;
InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress());
try {
this.backupNode = NameNodeProxies.createNonHAProxy(new HdfsConfiguration(),
bnAddress, JournalProtocol.class, UserGroupInformation.getCurrentUser(),
true).getProxy();
} catch(IOException e) {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
}
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
/**
* Read the header of fsedit log
* @param in fsedit stream
* @return the edit log version number
* @throws IOException if error occurs
*/
@VisibleForTesting
static int readLogVersion(DataInputStream in, boolean verifyLayoutVersion)
throws IOException, LogHeaderCorruptException {
int logVersion;
try {
logVersion = in.readInt();
} catch (EOFException eofe) {
throw new LogHeaderCorruptException(
"Reached EOF when reading log header");
}
if (verifyLayoutVersion &&
(logVersion < HdfsConstants.NAMENODE_LAYOUT_VERSION || // future version
logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION)) { // unsupported
throw new LogHeaderCorruptException(
"Unexpected version of the file system log file: "
+ logVersion + ". Current version = "
+ HdfsConstants.NAMENODE_LAYOUT_VERSION + ".");
}
return logVersion;
}
/**
* Perform the upgrade of the storage dir to the given storage info. The new
* storage info is written into the current directory, and the previous.tmp
* directory is renamed to previous.
*
* @param sd the storage directory to upgrade
* @param storage info about the new upgraded versions.
* @throws IOException in the event of error
*/
public static void doUpgrade(StorageDirectory sd, Storage storage)
throws IOException {
LOG.info("Performing upgrade of storage directory " + sd.getRoot());
try {
// Write the version file, since saveFsImage only makes the
// fsimage_<txid>, and the directory is otherwise empty.
storage.writeProperties(sd);
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
Preconditions.checkState(!prevDir.exists(),
"previous directory must not exist for upgrade.");
Preconditions.checkState(tmpDir.exists(),
"previous.tmp directory must exist for upgrade.");
// rename tmp to previous
NNStorage.rename(tmpDir, prevDir);
} catch (IOException ioe) {
LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
throw ioe;
}
}
/**
* Gets initial volume failure information for all volumes that failed
* immediately at startup. The method works by determining the set difference
* between all configured storage locations and the actual storage locations in
* use after attempting to put all of them into service.
*
* @return each storage location that has failed
*/
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
Collection<StorageLocation> dataLocations, DataStorage storage) {
Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
dataLocations.size());
for (StorageLocation sl: dataLocations) {
failedLocationSet.add(sl.getFile().getAbsolutePath());
}
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory sd = it.next();
failedLocationSet.remove(sd.getRoot().getAbsolutePath());
}
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
failedLocationSet.size());
long failureDate = Time.now();
for (String failedStorageLocation: failedLocationSet) {
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
failureDate));
}
return volumeFailureInfos;
}
public static void createFederatedDatanodesVersionFiles(File[] parents,
int namespaceId, StorageInfo version, String dirName) throws IOException {
for (File parent : parents) {
File nsRoot = NameSpaceSliceStorage.getNsRoot(namespaceId, parent);
Properties props = new Properties();
props.setProperty(NameSpaceSliceStorage.NAMESPACE_ID,
String.valueOf(version.getNamespaceID()));
props.setProperty(NameSpaceSliceStorage.CHECK_TIME,
String.valueOf(version.getCTime()));
props.setProperty(NameSpaceSliceStorage.LAYOUT_VERSION,
String.valueOf(version.getLayoutVersion()));
File nsVersionFile = new File(new File(nsRoot,
dirName), "VERSION");
Storage.writeProps(nsVersionFile, props);
}
}
/**
* Throw appropriate exception during upgrade from 203, when editlog loading
* could fail due to opcode conflicts.
*/
private void check203UpgradeFailure(int logVersion, Throwable e)
throws IOException {
// 0.20.203 version version has conflicting opcodes with the later releases.
// The editlog must be emptied by restarting the namenode, before proceeding
// with the upgrade.
if (Storage.is203LayoutVersion(logVersion)
&& logVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
String msg = "During upgrade failed to load the editlog version "
+ logVersion + " from release 0.20.203. Please go back to the old "
+ " release and restart the namenode. This empties the editlog "
+ " and saves the namespace. Resume the upgrade after this step.";
throw new IOException(msg, e);
}
}
/**
* Iterate over all the storage directories, checking if it should be
* formatted. Format the storage if necessary and allowed by the user.
* @return True if formatting is processed
*/
private boolean format(NNStorage storage, NamespaceInfo nsInfo)
throws IOException {
// Check with the user before blowing away data.
if (!Storage.confirmFormat(storage.dirIterable(null), force, interactive)) {
storage.close();
return false;
} else {
// Format the storage (writes VERSION file)
storage.format(nsInfo);
return true;
}
}
/** Check whether the path is a valid DataNode data directory. */
private static void checkDir(File dataDir) {
Storage.StorageDirectory sd = new Storage.StorageDirectory(dataDir);
assertTrue(sd.getRoot().isDirectory());
assertTrue(sd.getCurrentDir().isDirectory());
assertTrue(sd.getVersionFile().isFile());
}
/**
* Client-side Method to fetch file from a server
* Copies the response from the URL to a list of local files.
* @param dstStorage if an error occurs writing to one of the files,
* this storage object will be notified.
* @Return a digest of the received file if getChecksum is true
*/
static MD5Hash getFileClient(URL infoServer,
String queryString, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
LOG.info("Opening connection to " + url);
return doGetUrl(url, localPaths, dstStorage, getChecksum);
}
@Test
public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(File.class), any(StorageType.class));
doThrow(new IOException("Failed to getVolumeMap()"))
.when(mockVolume).getVolumeMap(
anyString(),
any(ReplicaMap.class),
any(RamDiskReplicaLruTracker.class));
Storage.StorageDirectory sd = createStorageDirectory(badDir);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(badDir.getAbsoluteFile()),
Matchers.<List<NamespaceInfo>>any()))
.thenReturn(builder);
StorageLocation location = StorageLocation.parse(badDir.toString());
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
try {
spyDataset.addVolume(location, nsInfos);
fail("Expect to throw MultipleIOException");
} catch (MultipleIOException e) {
}
FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
}
@Test(timeout=60000)
public void testAddBackRemovedVolume()
throws IOException, TimeoutException, InterruptedException,
ReconfigurationException {
startDFSCluster(1, 2);
// Create some data on every volume.
createFile(new Path("/test"), 32);
DataNode dn = cluster.getDataNodes().get(0);
Configuration conf = dn.getConf();
String oldDataDir = conf.get(DFS_DATANODE_DATA_DIR_KEY);
String keepDataDir = oldDataDir.split(",")[0];
String removeDataDir = oldDataDir.split(",")[1];
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, keepDataDir);
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
String bpid = cluster.getNamesystem(i).getBlockPoolId();
BlockPoolSliceStorage bpsStorage =
dn.getStorage().getBPStorage(bpid);
// Make sure that there is no block pool level storage under removeDataDir.
for (int j = 0; j < bpsStorage.getNumStorageDirs(); j++) {
Storage.StorageDirectory sd = bpsStorage.getStorageDir(j);
assertFalse(sd.getRoot().getAbsolutePath().startsWith(
new File(removeDataDir).getAbsolutePath()
));
}
assertEquals(dn.getStorage().getBPStorage(bpid).getNumStorageDirs(), 1);
}
// Bring the removed directory back. It only successes if all metadata about
// this directory were removed from the previous step.
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
}
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
final File dir = sd.getCurrentDir();
final StorageType storageType =
getStorageTypeFromLocations(dataLocations, sd.getRoot());
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
synchronized (this) {
volumeMap.addAll(tempVolumeMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
volumes.addVolume(ref);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
/**
* Removes a set of volumes from FsDataset.
* @param volumesToRemove a set of absolute root path of each volume.
* @param clearFailure set true to clear failure information.
*
* DataNode should call this function before calling
* {@link DataStorage#removeVolumes(java.util.Collection)}.
*/
@Override
public synchronized void removeVolumes(
Set<File> volumesToRemove, boolean clearFailure) {
// Make sure that all volumes are absolute path.
for (File vol : volumesToRemove) {
Preconditions.checkArgument(vol.isAbsolute(),
String.format("%s is not absolute path.", vol.getPath()));
}
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final File absRoot = sd.getRoot().getAbsoluteFile();
if (volumesToRemove.contains(absRoot)) {
LOG.info("Removing " + absRoot + " from FsDataset.");
// Disable the volume from the service.
asyncDiskService.removeVolume(sd.getCurrentDir());
volumes.removeVolume(absRoot, clearFailure);
// Removed all replica information for the blocks on the volume. Unlike
// updating the volumeMap in addVolume(), this operation does not scan
// disks.
for (String bpid : volumeMap.getBlockPoolList()) {
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
it.hasNext(); ) {
ReplicaInfo block = it.next();
final File absBasePath =
new File(block.getVolume().getBasePath()).getAbsoluteFile();
if (absBasePath.equals(absRoot)) {
invalidate(bpid, block);
it.remove();
}
}
}
storageMap.remove(sd.getStorageUuid());
}
}
setupAsyncLazyPersistThreads();
}
/**
* Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect
* changed volumes.
* @param newVolumes a comma separated string that specifies the data volumes.
* @return changed volumes.
* @throws IOException if none of the directories are specified in the
* configuration.
*/
@VisibleForTesting
ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
Configuration conf = new Configuration();
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
List<StorageLocation> locations = getStorageLocations(conf);
if (locations.isEmpty()) {
throw new IOException("No directory is specified.");
}
ChangedVolumes results = new ChangedVolumes();
results.newLocations.addAll(locations);
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory dir = it.next();
boolean found = false;
for (Iterator<StorageLocation> sl = results.newLocations.iterator();
sl.hasNext(); ) {
StorageLocation location = sl.next();
if (location.getFile().getCanonicalPath().equals(
dir.getRoot().getCanonicalPath())) {
sl.remove();
results.unchangedLocations.add(location);
found = true;
break;
}
}
if (!found) {
results.deactivateLocations.add(
StorageLocation.parse(dir.getRoot().toString()));
}
}
return results;
}
@Test(expected=IOException.class)
public void testUpgradeFromPreUpgradeLVFails() throws IOException {
// Upgrade from versions prior to Storage#LAST_UPGRADABLE_LAYOUT_VERSION
// is not allowed
Storage.checkVersionUpgradable(Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION + 1);
fail("Expected IOException is not thrown");
}