下面列出了org.apache.hadoop.mapred.FileAlreadyExistsException#org.apache.hadoop.util.DiskChecker.DiskErrorException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static List<HddsVolume> makeVolumes(
int numVolumes, VolumeCheckResult health) throws Exception {
final List<HddsVolume> volumes = new ArrayList<>(numVolumes);
for (int i = 0; i < numVolumes; ++i) {
final HddsVolume volume = mock(HddsVolume.class);
if (health != null) {
when(volume.check(any(Boolean.class))).thenReturn(health);
when(volume.check(isNull())).thenReturn(health);
} else {
final DiskErrorException de = new DiskErrorException("Fake Exception");
when(volume.check(any(Boolean.class))).thenThrow(de);
when(volume.check(isNull())).thenThrow(de);
}
volumes.add(volume);
}
return volumes;
}
/**
* Verify that bad volumes are filtered at startup.
* @throws IOException
*/
@Test
public void testBadDirectoryDetection() throws IOException {
final int numVolumes = 5;
final int numBadVolumes = 2;
conf = getConfWithDataNodeDirs(numVolumes);
final MutableVolumeSet volumeSet = new MutableVolumeSet(
UUID.randomUUID().toString(), conf) {
@Override
HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
throws DiskErrorException {
return new DummyChecker(configuration, new Timer(), numBadVolumes);
}
};
assertThat(volumeSet.getFailedVolumesList().size(), is(numBadVolumes));
assertThat(volumeSet.getVolumesList().size(),
is(numVolumes - numBadVolumes));
volumeSet.shutdown();
}
/**
* Verify that all volumes are added to fail list if all volumes are bad.
*/
@Test
public void testAllVolumesAreBad() throws IOException {
final int numVolumes = 5;
conf = getConfWithDataNodeDirs(numVolumes);
final MutableVolumeSet volumeSet = new MutableVolumeSet(
UUID.randomUUID().toString(), conf) {
@Override
HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
throws DiskErrorException {
return new DummyChecker(configuration, new Timer(), numVolumes);
}
};
assertEquals(volumeSet.getFailedVolumesList().size(), numVolumes);
assertEquals(volumeSet.getVolumesList().size(), 0);
volumeSet.shutdown();
}
private Path createPath(String path,
boolean checkWrite) throws IOException {
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
path);
if (checkWrite) {
//check whether we are able to create a directory here. If the disk
//happens to be RDONLY we will fail
try {
DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
return file;
} catch (DiskErrorException d) {
LOG.warn("Disk Error Exception: ", d);
return null;
}
}
return file;
}
/** Get a path from the local FS for reading. We search through all the
* configured dirs for the file's existence and return the complete
* path to the file when we find one
*/
public synchronized Path getLocalPathToRead(String pathStr,
Configuration conf) throws IOException {
confChanged(conf);
int numDirs = localDirs.length;
int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked)
if (pathStr.startsWith("/")) {
pathStr = pathStr.substring(1);
}
while (numDirsSearched < numDirs) {
Path file = new Path(localDirs[numDirsSearched], pathStr);
if (localFS.exists(file)) {
return file;
}
numDirsSearched++;
}
//no path found
throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
" the configured local directories");
}
private void _mkdirs(boolean exists, FsPermission before, FsPermission after)
throws Throwable {
File localDir = make(stub(File.class).returning(exists).from.exists());
when(localDir.mkdir()).thenReturn(true);
Path dir = mock(Path.class); // use default stubs
LocalFileSystem fs = make(stub(LocalFileSystem.class)
.returning(localDir).from.pathToFile(dir));
FileStatus stat = make(stub(FileStatus.class)
.returning(after).from.getPermission());
when(fs.getFileStatus(dir)).thenReturn(stat);
try {
DiskChecker.mkdirsWithExistsAndPermissionCheck(fs, dir, before);
if (!exists)
verify(fs).setPermission(dir, before);
else {
verify(fs).getFileStatus(dir);
verify(stat).getPermission();
}
}
catch (DiskErrorException e) {
if (before != after)
assertTrue(e.getMessage().startsWith("Incorrect permission"));
}
}
private void _checkDirs(boolean isDir, FsPermission perm, boolean success)
throws Throwable {
File localDir = File.createTempFile("test", "tmp");
if (isDir) {
localDir.delete();
localDir.mkdir();
}
Shell.execCommand(Shell.getSetPermissionCommand(String.format("%04o",
perm.toShort()), false, localDir.getAbsolutePath()));
try {
DiskChecker.checkDir(FileSystem.getLocal(new Configuration()),
new Path(localDir.getAbsolutePath()), perm);
assertTrue("checkDir success", success);
} catch (DiskErrorException e) {
assertFalse("checkDir success", success);
}
localDir.delete();
}
private void _checkDirs(boolean isDir, String perm, boolean success)
throws Throwable {
File localDir = File.createTempFile("test", "tmp");
if (isDir) {
localDir.delete();
localDir.mkdir();
}
Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
localDir.getAbsolutePath()));
try {
DiskChecker.checkDir(localDir);
assertTrue("checkDir success", success);
} catch (DiskErrorException e) {
e.printStackTrace();
assertFalse("checkDir success", success);
}
localDir.delete();
System.out.println("checkDir success: " + success);
}
private Path createPath(String path,
boolean checkWrite) throws IOException {
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
path);
if (checkWrite) {
//check whether we are able to create a directory here. If the disk
//happens to be RDONLY we will fail
try {
DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
return file;
} catch (DiskErrorException d) {
LOG.warn("Disk Error Exception: ", d);
return null;
}
}
return file;
}
/** Get a path from the local FS for reading. We search through all the
* configured dirs for the file's existence and return the complete
* path to the file when we find one
*/
public synchronized Path getLocalPathToRead(String pathStr,
Configuration conf) throws IOException {
confChanged(conf);
int numDirs = localDirs.length;
int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked)
if (pathStr.startsWith("/")) {
pathStr = pathStr.substring(1);
}
while (numDirsSearched < numDirs) {
Path file = new Path(localDirs[numDirsSearched], pathStr);
if (localFS.exists(file)) {
return file;
}
numDirsSearched++;
}
//no path found
throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
" the configured local directories");
}
private void _mkdirs(boolean exists, FsPermission before, FsPermission after)
throws Throwable {
File localDir = make(stub(File.class).returning(exists).from.exists());
when(localDir.mkdir()).thenReturn(true);
Path dir = mock(Path.class); // use default stubs
LocalFileSystem fs = make(stub(LocalFileSystem.class)
.returning(localDir).from.pathToFile(dir));
FileStatus stat = make(stub(FileStatus.class)
.returning(after).from.getPermission());
when(fs.getFileStatus(dir)).thenReturn(stat);
try {
DiskChecker.mkdirsWithExistsAndPermissionCheck(fs, dir, before);
if (!exists)
verify(fs).setPermission(dir, before);
else {
verify(fs).getFileStatus(dir);
verify(stat).getPermission();
}
}
catch (DiskErrorException e) {
if (before != after)
assertTrue(e.getMessage().startsWith("Incorrect permission"));
}
}
private void _checkDirs(boolean isDir, FsPermission perm, boolean success)
throws Throwable {
File localDir = File.createTempFile("test", "tmp");
if (isDir) {
localDir.delete();
localDir.mkdir();
}
Shell.execCommand(Shell.getSetPermissionCommand(String.format("%04o",
perm.toShort()), false, localDir.getAbsolutePath()));
try {
DiskChecker.checkDir(FileSystem.getLocal(new Configuration()),
new Path(localDir.getAbsolutePath()), perm);
assertTrue("checkDir success", success);
} catch (DiskErrorException e) {
assertFalse("checkDir success", success);
}
localDir.delete();
}
private void _checkDirs(boolean isDir, String perm, boolean success)
throws Throwable {
File localDir = File.createTempFile("test", "tmp");
if (isDir) {
localDir.delete();
localDir.mkdir();
}
Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
localDir.getAbsolutePath()));
try {
DiskChecker.checkDir(localDir);
assertTrue("checkDir success", success);
} catch (DiskErrorException e) {
e.printStackTrace();
assertFalse("checkDir success", success);
}
localDir.delete();
System.out.println("checkDir success: " + success);
}
/**
* Check if the given local directories
* (and parent directories, if necessary) can be created.
* @param localDirs where the new TaskTracker should keep its local files.
* @throws DiskErrorException if all local directories are not writable
*/
private static void checkLocalDirs(String[] localDirs)
throws DiskErrorException {
boolean writable = false;
if (localDirs != null) {
for (int i = 0; i < localDirs.length; i++) {
try {
DiskChecker.checkDir(new File(localDirs[i]));
writable = true;
} catch(DiskErrorException e) {
LOG.warn("Task Tracker local " + e.getMessage());
}
}
}
if (!writable)
throw new DiskErrorException(
"all local directories are not writable");
}
public static AvatarDataNode makeInstance(String[] dataDirs, Configuration conf)
throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
if (dirs.size() > 0) {
String dnThreadName = "AvatarDataNode: [" +
StringUtils.arrayToString(dataDirs) + "]";
return new AvatarDataNode(conf, dirs, dnThreadName);
}
LOG.error("All directories in dfs.data.dir are invalid.");
return null;
}
/** Get a path from the local FS for reading. We search through all the
* configured dirs for the file's existence and return the complete
* path to the file when we find one
*/
public synchronized Path getLocalPathToRead(String pathStr,
Configuration conf) throws IOException {
confChanged(conf);
int numDirs = localDirs.length;
int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked)
if (pathStr.startsWith("/")) {
pathStr = pathStr.substring(1);
}
while (numDirsSearched < numDirs) {
Path file = new Path(localDirs[numDirsSearched], pathStr);
if (localFS.exists(file)) {
return file;
}
numDirsSearched++;
}
//no path found
throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
" the configured local directories");
}
/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
* can be created.
* @param dataDirs List of directories, where the new DataNode instance should
* keep its files.
* @param conf Configuration instance to use.
* @return DataNode instance for given list of data dirs and conf, or null if
* no directory from this directory list can be created.
* @throws IOException
*/
public static DataNode makeInstance(String[] dataDirs, Configuration conf)
throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
if (dirs.size() > 0)
return new DataNode(conf, dirs);
LOG.error("All directories in dfs.data.dir are invalid.");
return null;
}
/**
* Check if the given local directories
* (and parent directories, if necessary) can be created.
* @param localDirs where the new TaskTracker should keep its local files.
* @throws DiskErrorException if all local directories are not writable
*/
private static void checkLocalDirs(String[] localDirs)
throws DiskErrorException {
boolean writable = false;
if (localDirs != null) {
for (int i = 0; i < localDirs.length; i++) {
try {
DiskChecker.checkDir(new File(localDirs[i]));
writable = true;
} catch(DiskErrorException e) {
LOG.warn("Task Tracker local " + e.getMessage());
}
}
}
if (!writable)
throw new DiskErrorException(
"all local directories are not writable");
}
/** Get a path from the local FS for reading. We search through all the
* configured dirs for the file's existence and return the complete
* path to the file when we find one
*/
public synchronized Path getLocalPathToRead(String pathStr,
Configuration conf) throws IOException {
confChanged(conf);
int numDirs = localDirs.length;
int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked)
if (pathStr.startsWith("/")) {
pathStr = pathStr.substring(1);
}
while (numDirsSearched < numDirs) {
Path file = new Path(localDirs[numDirsSearched], pathStr);
if (localFS.exists(file)) {
return file;
}
numDirsSearched++;
}
//no path found
throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
" the configured local directories");
}
/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
* can be created.
* @param dataDirs List of directories, where the new DataNode instance should
* keep its files.
* @param conf Configuration instance to use.
* @return DataNode instance for given list of data dirs and conf, or null if
* no directory from this directory list can be created.
* @throws IOException
*/
public static DataNode makeInstance(String[] dataDirs, Configuration conf)
throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
if (dirs.size() > 0)
return new DataNode(conf, dirs);
LOG.error("All directories in dfs.data.dir are invalid.");
return null;
}
@Test
public void testReduceOutOfDiskSpace() throws Throwable {
LOG.info("testReduceOutOfDiskSpace");
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
r, metrics, except, key, connection);
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
.thenReturn(replyHash);
when(connection.getInputStream()).thenReturn(in);
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenThrow(new DiskErrorException("No disk space available"));
underTest.copyFromHost(host);
verify(ss).reportLocalError(any(IOException.class));
}
/** {@inheritDoc} */
public void run(DatanodeID id) throws IOException {
final Pipeline p = getPipelineTest().getPipelineForDatanode(id);
if (p == null) {
return;
}
if (p.contains(index, id)) {
final String s = super.toString(id);
FiTestUtil.LOG.info(s);
throw new DiskErrorException(s);
}
}
@Test
public void testReduceOutOfDiskSpace() throws Throwable {
LOG.info("testReduceOutOfDiskSpace");
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
r, metrics, except, key, connection);
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
.thenReturn(replyHash);
when(connection.getInputStream()).thenReturn(in);
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenThrow(new DiskErrorException("No disk space available"));
underTest.copyFromHost(host);
verify(ss).reportLocalError(any(IOException.class));
}
/** {@inheritDoc} */
public void run(DatanodeID id) throws IOException {
final Pipeline p = getPipelineTest().getPipelineForDatanode(id);
if (p == null) {
return;
}
if (p.contains(index, id)) {
final String s = super.toString(id);
FiTestUtil.LOG.info(s);
throw new DiskErrorException(s);
}
}
/** This method gets called everytime before any read/write to make sure
* that any change to localDirs is reflected immediately.
*/
private void confChanged(Configuration conf) throws IOException {
String newLocalDirs = conf.get(contextCfgItemName);
Configuration newConf = new Configuration(conf);
newConf.setLong("dfs.df.interval", 30000);
if (!newLocalDirs.equals(savedLocalDirs)) {
localDirs = newConf.getStrings(contextCfgItemName);
localFS = FileSystem.getLocal(newConf);
int numDirs = localDirs.length;
ArrayList<String> dirs = new ArrayList<String>(numDirs);
ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
for (int i = 0; i < numDirs; i++) {
try {
// filter problematic directories
Path tmpDir = new Path(localDirs[i]);
if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) {
try {
DiskChecker.checkDir(new File(localDirs[i]));
dirs.add(localDirs[i]);
dfList.add(new DF(new File(localDirs[i]), newConf));
} catch (DiskErrorException de) {
LOG.warn( localDirs[i] + "is not writable\n" +
StringUtils.stringifyException(de));
}
} else {
LOG.warn( "Failed to create " + localDirs[i]);
}
} catch (IOException ie) {
LOG.warn( "Failed to create " + localDirs[i] + ": " +
ie.getMessage() + "\n" + StringUtils.stringifyException(ie));
} //ignore
}
localDirs = dirs.toArray(new String[dirs.size()]);
dirDF = dfList.toArray(new DF[dirs.size()]);
savedLocalDirs = newLocalDirs;
// randomize the first disk picked in the round-robin selection
dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size());
}
}
private Path createPath(String path) throws IOException {
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
path);
//check whether we are able to create a directory here. If the disk
//happens to be RDONLY we will fail
try {
DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
return file;
} catch (DiskErrorException d) {
LOG.warn(StringUtils.stringifyException(d));
return null;
}
}
/**
* check if a data directory is healthy
* @throws DiskErrorException
*/
public void checkDirTree() throws DiskErrorException {
DiskChecker.checkDir(dir);
FSDir[] children = this.getChildren();
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].checkDirTree();
}
}
}
/**
* goes over all the volumes and checkDir eachone of them
* if one throws DiskErrorException - removes from the list of active
* volumes.
* @return list of all the removed volumes
*/
private List<FSVolume> checkDirs() {
List<FSVolume> removed_vols = null;
FSVolume[] fsVolumes = this.getVolumes();
for (int idx = 0; idx < fsVolumes.length; idx++) {
FSVolume fsv = fsVolumes[idx];
try {
fsv.checkDirs();
} catch (DiskErrorException e) {
DataNode.LOG.warn("Removing failed volume " + fsv + ": ", e);
if (removed_vols == null) {
removed_vols = new ArrayList<FSVolume>();
removed_vols.add(fsVolumes[idx]);
}
}
}
if (removed_vols != null && removed_vols.size() > 0) {
volumeList.removeVolumes(removed_vols);
DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed="
+ removed_vols.size() + "volumes. List of current volumes: "
+ toString());
}
return removed_vols;
}
/**
* check if a data directory is healthy
* if some volumes failed - make sure to remove all the blocks that belong
* to these volumes
* @throws DiskErrorException
*/
public void checkDataDir() throws DiskErrorException {
long total_blocks=0, removed_blocks=0;
List<FSVolume> failed_vols = null;
lock.readLock().lock();
try {
failed_vols = volumes.checkDirs();
} finally {
lock.readLock().unlock();
}
//if there no failed volumes return
if(failed_vols == null)
return;
// else
// remove related blocks
long mlsec = System.currentTimeMillis();
lock.writeLock().lock();
try {
volumeMap.removeUnhealthyVolumes(failed_vols);
} finally {
lock.writeLock().unlock();
}
mlsec = System.currentTimeMillis() - mlsec;
DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
"(took " + mlsec + " millisecs)");
// report the error
StringBuilder sb = new StringBuilder();
for(FSVolume fv : failed_vols) {
sb.append(fv.toString() + ";");
}
throw new DiskErrorException("DataNode failed volumes:" + sb);
}
/**
* Check if there is a disk failure and if so, handle the error
*
**/
protected void checkDiskError( ) throws IOException{
// We disallow concurrent disk checks as it doesn't help
// but can significantly impact performance and reliability of
// the system.
//
boolean setSuccess = checkingDisk.compareAndSet(false, true);
if (!setSuccess) {
LOG.info("checkDiskError is already running.");
return;
}
try {
// We don't check disks if it's not long since last check.
//
long curTime = System.currentTimeMillis();
if (curTime - timeLastCheckDisk < minDiskCheckIntervalMsec) {
LOG.info("checkDiskError finished within "
+ minDiskCheckIntervalMsec + " mses. Skip this one.");
return;
}
data.checkDataDir();
timeLastCheckDisk = System.currentTimeMillis();
} catch(DiskErrorException de) {
handleDiskError(de.getMessage());
} finally {
checkingDisk.set(false);
}
}