下面列出了怎么用org.apache.hadoop.util.DiskChecker的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Check if the given local directories
* (and parent directories, if necessary) can be created.
* @param localDirs where the new TaskTracker should keep its local files.
* @throws DiskErrorException if all local directories are not writable
*/
private static void checkLocalDirs(String[] localDirs)
throws DiskErrorException {
boolean writable = false;
if (localDirs != null) {
for (int i = 0; i < localDirs.length; i++) {
try {
DiskChecker.checkDir(new File(localDirs[i]));
writable = true;
} catch(DiskErrorException e) {
LOG.warn("Task Tracker local " + e.getMessage());
}
}
}
if (!writable)
throw new DiskErrorException(
"all local directories are not writable");
}
public static AvatarDataNode makeInstance(String[] dataDirs, Configuration conf)
throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
if (dirs.size() > 0) {
String dnThreadName = "AvatarDataNode: [" +
StringUtils.arrayToString(dataDirs) + "]";
return new AvatarDataNode(conf, dirs, dnThreadName);
}
LOG.error("All directories in dfs.data.dir are invalid.");
return null;
}
static ArrayList<File> getDataDirsFromURIs(Collection<URI> dataDirs) {
ArrayList<File> dirs = new ArrayList<File>();
for (URI dirURI : dataDirs) {
if (!"file".equalsIgnoreCase(dirURI.getScheme())) {
LOG.warn("Unsupported URI schema in " + dirURI + ". Ignoring ...");
continue;
}
// drop any (illegal) authority in the URI for backwards compatibility
File data = new File(dirURI.getPath());
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch (IOException e) {
LOG.warn("Invalid directory in dfs.data.dir: "
+ e.getMessage());
}
}
return dirs;
}
/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
* can be created.
* @param dataDirs List of directories, where the new DataNode instance should
* keep its files.
* @param conf Configuration instance to use.
* @return DataNode instance for given list of data dirs and conf, or null if
* no directory from this directory list can be created.
* @throws IOException
*/
public static DataNode makeInstance(String[] dataDirs, Configuration conf)
throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
if (dirs.size() > 0)
return new DataNode(conf, dirs);
LOG.error("All directories in dfs.data.dir are invalid.");
return null;
}
/**
* Check if the given local directories
* (and parent directories, if necessary) can be created.
* @param localDirs where the new TaskTracker should keep its local files.
* @throws DiskErrorException if all local directories are not writable
*/
private static void checkLocalDirs(String[] localDirs)
throws DiskErrorException {
boolean writable = false;
if (localDirs != null) {
for (int i = 0; i < localDirs.length; i++) {
try {
DiskChecker.checkDir(new File(localDirs[i]));
writable = true;
} catch(DiskErrorException e) {
LOG.warn("Task Tracker local " + e.getMessage());
}
}
}
if (!writable)
throw new DiskErrorException(
"all local directories are not writable");
}
/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
* can be created.
* @param dataDirs List of directories, where the new DataNode instance should
* keep its files.
* @param conf Configuration instance to use.
* @return DataNode instance for given list of data dirs and conf, or null if
* no directory from this directory list can be created.
* @throws IOException
*/
public static DataNode makeInstance(String[] dataDirs, Configuration conf)
throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
if (dirs.size() > 0)
return new DataNode(conf, dirs);
LOG.error("All directories in dfs.data.dir are invalid.");
return null;
}
/**
* Run a check on the current volume to determine if it is healthy.
* @param unused context for the check, ignored.
* @return result of checking the volume.
* @throws Exception if an exception was encountered while running
* the volume check.
*/
@Override
public VolumeCheckResult check(@Nullable Boolean unused) throws Exception {
if (!hddsRootDir.exists()) {
return VolumeCheckResult.FAILED;
}
DiskChecker.checkDir(hddsRootDir);
return VolumeCheckResult.HEALTHY;
}
@Test
public void testDiskFullExceptionCreateContainer() throws Exception {
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenThrow(DiskChecker.DiskOutOfSpaceException.class);
try {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
fail("testDiskFullExceptionCreateContainer failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("disk out of space",
ex);
assertEquals(ContainerProtos.Result.DISK_OUT_OF_SPACE, ex.getResult());
}
}
Map<String, DiskErrorInformation> testDirs(List<String> dirs) {
HashMap<String, DiskErrorInformation> ret =
new HashMap<String, DiskErrorInformation>();
for (final String dir : dirs) {
String msg;
try {
File testDir = new File(dir);
DiskChecker.checkDir(testDir);
if (isDiskUsageOverPercentageLimit(testDir)) {
msg =
"used space above threshold of "
+ diskUtilizationPercentageCutoff
+ "%";
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
continue;
} else if (isDiskFreeSpaceUnderLimit(testDir)) {
msg =
"free space below limit of " + diskUtilizationSpaceCutoff
+ "MB";
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
continue;
}
// create a random dir to make sure fs isn't in read-only mode
verifyDirUsingMkdir(testDir);
} catch (IOException ie) {
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage()));
}
}
return ret;
}
private static void validateAndCreateJournalDir(File dir) throws IOException {
if (!dir.isAbsolute()) {
throw new IllegalArgumentException(
"Journal dir '" + dir + "' should be an absolute path");
}
DiskChecker.checkDir(dir);
}
Map<String, DiskErrorInformation> testDirs(List<String> dirs) {
HashMap<String, DiskErrorInformation> ret =
new HashMap<String, DiskErrorInformation>();
for (final String dir : dirs) {
String msg;
try {
File testDir = new File(dir);
DiskChecker.checkDir(testDir);
if (isDiskUsageOverPercentageLimit(testDir)) {
msg =
"used space above threshold of "
+ diskUtilizationPercentageCutoff
+ "%";
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
continue;
} else if (isDiskFreeSpaceUnderLimit(testDir)) {
msg =
"free space below limit of " + diskUtilizationSpaceCutoff
+ "MB";
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
continue;
}
// create a random dir to make sure fs isn't in read-only mode
verifyDirUsingMkdir(testDir);
} catch (IOException ie) {
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage()));
}
}
return ret;
}
private static void validateAndCreateJournalDir(File dir) throws IOException {
if (!dir.isAbsolute()) {
throw new IllegalArgumentException(
"Journal dir '" + dir + "' should be an absolute path");
}
DiskChecker.checkDir(dir);
}
/**
* Ensure that the tests are picking up the modified Hadoop classes
*/
private static void checkOverriddenHadoopClasses() {
List<Class<?>> modifiedHadoopClasses = Arrays.asList(BlockPoolSlice.class, DiskChecker.class,
FileUtil.class, HardLink.class, HttpServer2.class, NameNodeResourceChecker.class, RawLocalFileSystem.class);
for (Class<?> clazz : modifiedHadoopClasses) {
try {
LuceneTestCase.assertNotNull("Field on " + clazz.getCanonicalName() + " should not have been null",
clazz.getField(SOLR_HACK_FOR_CLASS_VERIFICATION_FIELD));
} catch (NoSuchFieldException e) {
LuceneTestCase.fail("Expected to load Solr modified Hadoop class " + clazz.getCanonicalName() +
" , but it was not found.");
}
}
}
/**
* check if a data directory is healthy
* @throws DiskErrorException
*/
public void checkDirTree() throws DiskErrorException {
DiskChecker.checkDir(dir);
FSDir[] children = this.getChildren();
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].checkDirTree();
}
}
}
/**
* Create physical directory for Name Spaces on the data node
*
* @param dataDirs
* List of data directories
* @throws IOException on errors
*/
public static void makeNameSpaceDataDir(Collection<File> dataDirs) throws IOException {
for (File data : dataDirs) {
try {
DiskChecker.checkDir(data);
} catch ( IOException e ) {
LOG.warn("Invalid directory in: " + data.getCanonicalPath() + ": "
+ e.getMessage());
}
}
}
/**
* check if a data diretory is healthy
* @throws DiskErrorException
*/
public void checkDirTree() throws DiskErrorException {
DiskChecker.checkDir(dir);
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].checkDirTree();
}
}
}
@VisibleForTesting
HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
throws DiskChecker.DiskErrorException {
return new HddsVolumeChecker(configuration, new Timer());
}
public void addResource(LocalizerResourceRequestEvent request) {
// TODO handle failures, cancellation, requests by other containers
LocalizedResource rsrc = request.getResource();
LocalResourceRequest key = rsrc.getRequest();
LOG.info("Downloading public rsrc:" + key);
/*
* Here multiple containers may request the same resource. So we need
* to start downloading only when
* 1) ResourceState == DOWNLOADING
* 2) We are able to acquire non blocking semaphore lock.
* If not we will skip this resource as either it is getting downloaded
* or it FAILED / LOCALIZED.
*/
if (rsrc.tryAcquire()) {
if (rsrc.getState() == ResourceState.DOWNLOADING) {
LocalResource resource = request.getResource().getRequest();
try {
Path publicRootPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
Path publicDirDestPath =
publicRsrc.getPathForLocalization(key, publicRootPath,
delService);
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
pending.put(queue.submit(new FSDownload(lfs, null, conf,
publicDirDestPath, resource, request.getContext().getStatCache())),
request);
}
} catch (IOException e) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), e.getMessage()));
LOG.error("Local path for public localization is not found. "
+ " May be disks failed.", e);
} catch (IllegalArgumentException ie) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), ie.getMessage()));
LOG.error("Local path for public localization is not found. "
+ " Incorrect path. " + request.getResource().getRequest()
.getPath(), ie);
} catch (RejectedExecutionException re) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), re.getMessage()));
LOG.error("Failed to submit rsrc " + rsrc + " for download."
+ " Either queue is full or threadpool is shutdown.", re);
}
} else {
rsrc.unlock();
}
}
}
Callable<Path> download(Path path, LocalResource rsrc,
UserGroupInformation ugi) throws IOException {
DiskChecker.checkDir(new File(path.toUri().getRawPath()));
return new FSDownload(lfs, ugi, conf, path, rsrc);
}
void checkDirs() throws DiskErrorException {
DiskChecker.checkDirs(finalizedDir);
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir);
}
public void checkDir(LocalFileSystem localFS, Path path)
throws DiskErrorException, IOException {
DiskChecker.checkDir(localFS, path, expectedPermission);
}
public void addResource(LocalizerResourceRequestEvent request) {
// TODO handle failures, cancellation, requests by other containers
LocalizedResource rsrc = request.getResource();
LocalResourceRequest key = rsrc.getRequest();
LOG.info("Downloading public rsrc:" + key);
/*
* Here multiple containers may request the same resource. So we need
* to start downloading only when
* 1) ResourceState == DOWNLOADING
* 2) We are able to acquire non blocking semaphore lock.
* If not we will skip this resource as either it is getting downloaded
* or it FAILED / LOCALIZED.
*/
if (rsrc.tryAcquire()) {
if (rsrc.getState() == ResourceState.DOWNLOADING) {
LocalResource resource = request.getResource().getRequest();
try {
Path publicRootPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
Path publicDirDestPath =
publicRsrc.getPathForLocalization(key, publicRootPath);
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
// In case this is not a newly initialized nm state, ensure
// initialized local/log dirs similar to LocalizerRunner
getInitializedLocalDirs();
getInitializedLogDirs();
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
pending.put(queue.submit(new FSDownload(lfs, null, conf,
publicDirDestPath, resource, request.getContext().getStatCache())),
request);
}
} catch (IOException e) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), e.getMessage()));
LOG.error("Local path for public localization is not found. "
+ " May be disks failed.", e);
} catch (IllegalArgumentException ie) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), ie.getMessage()));
LOG.error("Local path for public localization is not found. "
+ " Incorrect path. " + request.getResource().getRequest()
.getPath(), ie);
} catch (RejectedExecutionException re) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), re.getMessage()));
LOG.error("Failed to submit rsrc " + rsrc + " for download."
+ " Either queue is full or threadpool is shutdown.", re);
}
} else {
rsrc.unlock();
}
}
}
Callable<Path> download(Path path, LocalResource rsrc,
UserGroupInformation ugi) throws IOException {
DiskChecker.checkDir(new File(path.toUri().getRawPath()));
return new FSDownload(lfs, ugi, conf, path, rsrc);
}
void checkDirs() throws DiskErrorException {
DiskChecker.checkDirs(finalizedDir);
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir);
}
public void checkDir(LocalFileSystem localFS, Path path)
throws DiskErrorException, IOException {
DiskChecker.checkDir(localFS, path, expectedPermission);
}
void checkDirs() throws DiskErrorException {
DiskChecker.checkDir(finalizedDir);
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir);
}
void checkDirs() throws DiskErrorException {
dataDir.checkDirTree();
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(detachDir);
DiskChecker.checkDir(rbwDir);
}
/**
* Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
* and increments it. This method is first called by messageReceived()
* maxSessionOpenFiles times and then on the completion of every
* sendMapOutput operation. This limits the number of open files on a node,
* which can get really large(exhausting file descriptors on the NM) if all
* sendMapOutputs are called in one go, as was done previous to this change.
* @param reduceContext used to call sendMapOutput with correct params.
* @return the ChannelFuture of the sendMapOutput, can be null.
*/
public ChannelFuture sendMap(ReduceContext reduceContext)
throws Exception {
ChannelFuture nextMap = null;
if (reduceContext.getMapsToSend().get() <
reduceContext.getMapIds().size()) {
int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
String mapId = reduceContext.getMapIds().get(nextIndex);
try {
MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
if (info == null) {
info = getMapOutputInfo(reduceContext.dagId, mapId, reduceContext.getReduceRange(),
reduceContext.getJobId(),
reduceContext.getUser());
}
nextMap = sendMapOutput(
reduceContext.getCtx(),
reduceContext.getCtx().getChannel(),
reduceContext.getUser(), mapId,
reduceContext.getReduceRange(), info);
if (null == nextMap) {
sendError(reduceContext.getCtx(), NOT_FOUND);
return null;
}
nextMap.addListener(new ReduceMapFileCount(reduceContext));
} catch (IOException e) {
if (e instanceof DiskChecker.DiskErrorException) {
LOG.error("Shuffle error :" + e);
} else {
LOG.error("Shuffle error :", e);
}
String errorMessage = getErrorMessage(e);
sendError(reduceContext.getCtx(), errorMessage,
INTERNAL_SERVER_ERROR);
return null;
}
}
return nextMap;
}
void checkDirs() throws DiskErrorException {
dataDir.checkDirTree();
DiskChecker.checkDir(tmpDir);
}