下面列出了org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader#org.apache.hadoop.tools.util.DistCpUtils 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void writeToFileListing(SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus,
Path sourcePathRoot,
DistCpOptions options) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
}
FileStatus status = fileStatus;
if (!shouldCopy(fileStatus.getPath(), options)) {
return;
}
fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath())), status);
fileListWriter.sync();
if (!fileStatus.isDirectory()) {
totalBytesToCopy += fileStatus.getLen();
}
totalPaths++;
}
private void compareCheckSums(FileSystem sourceFS, Path source,
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
.append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
errorMessage.append(" Source and target differ in block-size.")
.append(" Use -pb to preserve block-sizes during copy.")
.append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)");
}
throw new IOException(errorMessage.toString());
}
}
/**
* Implementation of the Mapper::setup() method. This extracts the DistCp-
* options specified in the Job's configuration, to set up the Job.
* @param context Mapper's context.
* @throws IOException On IO failure.
* @throws InterruptedException If the job is interrupted.
*/
@Override
public void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
PRESERVE_STATUS.getConfigLabel()));
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
targetFS = targetFinalPath.getFileSystem(conf);
if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
overWrite = true; // When target is an existing file, overwrite it.
}
if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
initializeSSLConf(context);
}
}
private boolean canSkip(FileSystem sourceFS, FileStatus source,
FileStatus target) throws IOException {
if (!syncFolders) {
return true;
}
boolean sameLength = target.getLen() == source.getLen();
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
|| !preserve.contains(FileAttribute.BLOCKSIZE);
if (sameLength && sameBlockSize) {
return skipCrc ||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath());
} else {
return false;
}
}
private List<InputSplit> createSplits(JobContext jobContext,
List<DynamicInputChunk> chunks)
throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
for (int i=0; i< nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()),
null));
}
DistCpUtils.publish(jobContext.getConfiguration(),
CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
/**
* Add options to configuration. These will be used in the Mapper/committer
*
* @param conf - Configruation object to which the options need to be added
*/
public void appendToConf(Configuration conf) {
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
String.valueOf(atomicCommit));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
String.valueOf(ignoreFailures));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
String.valueOf(syncFolder));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
String.valueOf(deleteMissing));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
String.valueOf(overwrite));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
String.valueOf(append));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
String.valueOf(useDiff));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
String.valueOf(skipCRC));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
String.valueOf(mapBandwidth));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
}
private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
try {
for (Path path : pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
new CopyListingFileStatus(fs.getFileStatus(path)), context);
}
Assert.assertEquals(nFiles,
context.getCounter(CopyMapper.Counter.SKIP).getValue());
}
catch (Exception exception) {
Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
false);
}
}
private void writeToFileListing(SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus,
Path sourcePathRoot,
DistCpOptions options) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
}
FileStatus status = fileStatus;
if (!shouldCopy(fileStatus.getPath(), options)) {
return;
}
fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath())), status);
fileListWriter.sync();
if (!fileStatus.isDirectory()) {
totalBytesToCopy += fileStatus.getLen();
}
totalPaths++;
}
private void compareCheckSums(FileSystem sourceFS, Path source,
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
.append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
errorMessage.append(" Source and target differ in block-size.")
.append(" Use -pb to preserve block-sizes during copy.")
.append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)");
}
throw new IOException(errorMessage.toString());
}
}
/**
* Implementation of the Mapper::setup() method. This extracts the DistCp-
* options specified in the Job's configuration, to set up the Job.
* @param context Mapper's context.
* @throws IOException On IO failure.
* @throws InterruptedException If the job is interrupted.
*/
@Override
public void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
PRESERVE_STATUS.getConfigLabel()));
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
targetFS = targetFinalPath.getFileSystem(conf);
if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
overWrite = true; // When target is an existing file, overwrite it.
}
if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
initializeSSLConf(context);
}
}
private boolean canSkip(FileSystem sourceFS, FileStatus source,
FileStatus target) throws IOException {
if (!syncFolders) {
return true;
}
boolean sameLength = target.getLen() == source.getLen();
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
|| !preserve.contains(FileAttribute.BLOCKSIZE);
if (sameLength && sameBlockSize) {
return skipCrc ||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath());
} else {
return false;
}
}
private List<InputSplit> createSplits(JobContext jobContext,
List<DynamicInputChunk> chunks)
throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
for (int i=0; i< nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()),
null));
}
DistCpUtils.publish(jobContext.getConfiguration(),
CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
/**
* Add options to configuration. These will be used in the Mapper/committer
*
* @param conf - Configruation object to which the options need to be added
*/
public void appendToConf(Configuration conf) {
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
String.valueOf(atomicCommit));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
String.valueOf(ignoreFailures));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
String.valueOf(syncFolder));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
String.valueOf(deleteMissing));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
String.valueOf(overwrite));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
String.valueOf(append));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
String.valueOf(useDiff));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
String.valueOf(skipCRC));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
String.valueOf(mapBandwidth));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
}
private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
try {
for (Path path : pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
new CopyListingFileStatus(fs.getFileStatus(path)), context);
}
Assert.assertEquals(nFiles,
context.getCounter(CopyMapper.Counter.SKIP).getValue());
}
catch (Exception exception) {
Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
false);
}
}
@Override
public CopyListingFileStatus apply(FileStatus fileStatus) {
try {
return DistCpUtils.toCopyListingFileStatus(fileSystem, fileStatus, preserveAcls, preserveXAttrs,
preserveRawXAttrs);
} catch (IOException e) {
throw new CircusTrainException("Error transforming to CopyListingFileStatus: " + fileStatus, e);
}
}
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
FileStatus sourceStatus,
Path sourcePathRoot,
DistCpOptions options)
throws IOException {
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
Stack<FileStatus> pathStack = new Stack<FileStatus>();
pathStack.push(sourceStatus);
while (!pathStack.isEmpty()) {
for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
if (LOG.isDebugEnabled())
LOG.debug("Recording source-path: "
+ sourceStatus.getPath() + " for copy.");
CopyListingFileStatus childCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
preserveAcls && child.isDirectory(),
preserveXAttrs && child.isDirectory(),
preserveRawXattrs && child.isDirectory());
writeToFileListing(fileListWriter, childCopyListingStatus,
sourcePathRoot, options);
if (isDirectoryAndNotEmpty(sourceFS, child)) {
if (LOG.isDebugEnabled())
LOG.debug("Traversing non-empty source dir: "
+ sourceStatus.getPath());
pathStack.push(child);
}
}
}
}
/**
* Create Job object for submitting it, with all the configuration
*
* @return Reference to job object.
* @throws IOException - Exception if any
*/
private Job createJob() throws IOException {
String jobName = "distcp";
String userChosenName = getConf().get(JobContext.JOB_NAME);
if (userChosenName != null)
jobName += ": " + userChosenName;
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS,
String.valueOf(inputOptions.getMaxMaps()));
if (inputOptions.getSslConfigurationFile() != null) {
setupSSLConfig(job);
}
inputOptions.appendToConf(job.getConfiguration());
return job;
}
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
FileStatus sourceFileStatus) {
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
.format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
message.append("% ")
.append(description).append(" [")
.append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
.append('/')
.append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
.append(']');
context.setStatus(message.toString());
}
/**
* Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
* such that the number of bytes to be copied for all the splits are
* approximately equal.
* @param context JobContext for the job.
* @return The list of uniformly-distributed input-splits.
* @throws IOException
* @throws InterruptedException
*/
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
int numSplits = DistCpUtils.getInt(configuration,
JobContext.NUM_MAPS);
if (numSplits == 0) return new ArrayList<InputSplit>();
return getSplits(configuration, numSplits,
DistCpUtils.getLong(configuration,
DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
private void openForRead(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
reader = new SequenceFileRecordReader<K, V>();
reader.initialize(new FileSplit(chunkFilePath, 0,
DistCpUtils.getFileSize(chunkFilePath, configuration), null),
taskAttemptContext);
}
@Test
public void testCopyWithAppend() throws Exception {
final FileSystem fs = cluster.getFileSystem();
// do the first distcp
testCopy(false);
// start appending data to source
appendSourceData();
// do the distcp again with -update and -append option
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
stubContext.getContext();
// Enable append
context.getConfiguration().setBoolean(
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
copyMapper.setup(context);
for (Path path: pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
path)), context);
}
verifyCopy(fs, false);
// verify that we only copied new appended data
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
.getValue());
Assert.assertEquals(pathList.size(), stubContext.getReporter().
getCounter(CopyMapper.Counter.COPY).getValue());
}
@Test(timeout=40000)
public void testMakeDirFailure() {
try {
deleteState();
createSourceData();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
workPath);
copyMapper.setup(context);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);
Assert.assertTrue("There should have been an exception.", false);
}
catch (Exception ignore) {
}
}
@Test(timeout=40000)
public void testCopyFailOnBlockSizeDifference() {
try {
deleteState();
createSourceDataWithDifferentBlockSize();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= EnumSet.noneOf(DistCpOptions.FileAttribute.class);
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
DistCpUtils.packAttributes(fileAttributes));
copyMapper.setup(context);
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
new CopyListingFileStatus(fileStatus), context);
}
Assert.fail("Copy should have failed because of block-size difference.");
}
catch (Exception exception) {
// Check that the exception suggests the use of -pb/-skipCrc.
Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getCause().getMessage().contains("pb"));
Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getCause().getMessage().contains("skipCrc"));
}
}
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
FileStatus sourceStatus,
Path sourcePathRoot,
DistCpOptions options)
throws IOException {
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
Stack<FileStatus> pathStack = new Stack<FileStatus>();
pathStack.push(sourceStatus);
while (!pathStack.isEmpty()) {
for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
if (LOG.isDebugEnabled())
LOG.debug("Recording source-path: "
+ sourceStatus.getPath() + " for copy.");
CopyListingFileStatus childCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
preserveAcls && child.isDirectory(),
preserveXAttrs && child.isDirectory(),
preserveRawXattrs && child.isDirectory());
writeToFileListing(fileListWriter, childCopyListingStatus,
sourcePathRoot, options);
if (isDirectoryAndNotEmpty(sourceFS, child)) {
if (LOG.isDebugEnabled())
LOG.debug("Traversing non-empty source dir: "
+ sourceStatus.getPath());
pathStack.push(child);
}
}
}
}
/**
* Create Job object for submitting it, with all the configuration
*
* @return Reference to job object.
* @throws IOException - Exception if any
*/
private Job createJob() throws IOException {
String jobName = "distcp";
String userChosenName = getConf().get(JobContext.JOB_NAME);
if (userChosenName != null)
jobName += ": " + userChosenName;
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS,
String.valueOf(inputOptions.getMaxMaps()));
if (inputOptions.getSslConfigurationFile() != null) {
setupSSLConfig(job);
}
inputOptions.appendToConf(job.getConfiguration());
return job;
}
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
FileStatus sourceFileStatus) {
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
.format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
message.append("% ")
.append(description).append(" [")
.append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
.append('/')
.append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
.append(']');
context.setStatus(message.toString());
}
/**
* Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
* such that the number of bytes to be copied for all the splits are
* approximately equal.
* @param context JobContext for the job.
* @return The list of uniformly-distributed input-splits.
* @throws IOException
* @throws InterruptedException
*/
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
int numSplits = DistCpUtils.getInt(configuration,
JobContext.NUM_MAPS);
if (numSplits == 0) return new ArrayList<InputSplit>();
return getSplits(configuration, numSplits,
DistCpUtils.getLong(configuration,
DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
private void openForRead(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
reader = new SequenceFileRecordReader<K, V>();
reader.initialize(new FileSplit(chunkFilePath, 0,
DistCpUtils.getFileSize(chunkFilePath, configuration), null),
taskAttemptContext);
}
@Test
public void testCopyWithAppend() throws Exception {
final FileSystem fs = cluster.getFileSystem();
// do the first distcp
testCopy(false);
// start appending data to source
appendSourceData();
// do the distcp again with -update and -append option
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
stubContext.getContext();
// Enable append
context.getConfiguration().setBoolean(
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
copyMapper.setup(context);
for (Path path: pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
path)), context);
}
verifyCopy(fs, false);
// verify that we only copied new appended data
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
.getValue());
Assert.assertEquals(pathList.size(), stubContext.getReporter().
getCounter(CopyMapper.Counter.COPY).getValue());
}
@Test(timeout=40000)
public void testMakeDirFailure() {
try {
deleteState();
createSourceData();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
workPath);
copyMapper.setup(context);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);
Assert.assertTrue("There should have been an exception.", false);
}
catch (Exception ignore) {
}
}