org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader#org.apache.hadoop.tools.util.DistCpUtils源码实例Demo

下面列出了org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader#org.apache.hadoop.tools.util.DistCpUtils 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: SimpleCopyListing.java
private void writeToFileListing(SequenceFile.Writer fileListWriter,
                                CopyListingFileStatus fileStatus,
                                Path sourcePathRoot,
                                DistCpOptions options) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
      fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
  }

  FileStatus status = fileStatus;

  if (!shouldCopy(fileStatus.getPath(), options)) {
    return;
  }

  fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
      fileStatus.getPath())), status);
  fileListWriter.sync();

  if (!fileStatus.isDirectory()) {
    totalBytesToCopy += fileStatus.getLen();
  }
  totalPaths++;
}
 
源代码2 项目: hadoop   文件: RetriableFileCopyCommand.java
private void compareCheckSums(FileSystem sourceFS, Path source,
    FileChecksum sourceChecksum, FileSystem targetFS, Path target)
    throws IOException {
  if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
      targetFS, target)) {
    StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
        .append(source).append(" and ").append(target).append(".");
    if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
      errorMessage.append(" Source and target differ in block-size.")
          .append(" Use -pb to preserve block-sizes during copy.")
          .append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
				.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)");
    }
    throw new IOException(errorMessage.toString());
  }
}
 
源代码3 项目: hadoop   文件: CopyMapper.java
/**
 * Implementation of the Mapper::setup() method. This extracts the DistCp-
 * options specified in the Job's configuration, to set up the Job.
 * @param context Mapper's context.
 * @throws IOException On IO failure.
 * @throws InterruptedException If the job is interrupted.
 */
@Override
public void setup(Context context) throws IOException, InterruptedException {
  conf = context.getConfiguration();

  syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
  ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
  skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
  overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
  append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
  preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
      PRESERVE_STATUS.getConfigLabel()));

  targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
  Path targetFinalPath = new Path(conf.get(
          DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
  targetFS = targetFinalPath.getFileSystem(conf);

  if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
    overWrite = true; // When target is an existing file, overwrite it.
  }

  if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
    initializeSSLConf(context);
  }
}
 
源代码4 项目: hadoop   文件: CopyMapper.java
private boolean canSkip(FileSystem sourceFS, FileStatus source, 
    FileStatus target) throws IOException {
  if (!syncFolders) {
    return true;
  }
  boolean sameLength = target.getLen() == source.getLen();
  boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
      || !preserve.contains(FileAttribute.BLOCKSIZE);
  if (sameLength && sameBlockSize) {
    return skipCrc ||
        DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
            targetFS, target.getPath());
  } else {
    return false;
  }
}
 
源代码5 项目: hadoop   文件: DynamicInputFormat.java
private List<InputSplit> createSplits(JobContext jobContext,
                                      List<DynamicInputChunk> chunks)
        throws IOException {
  int numMaps = getNumMapTasks(jobContext.getConfiguration());

  final int nSplits = Math.min(numMaps, chunks.size());
  List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
  
  for (int i=0; i< nSplits; ++i) {
    TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
    chunks.get(i).assignTo(taskId);
    splits.add(new FileSplit(chunks.get(i).getPath(), 0,
        // Setting non-zero length for FileSplit size, to avoid a possible
        // future when 0-sized file-splits are considered "empty" and skipped
        // over.
        getMinRecordsPerChunk(jobContext.getConfiguration()),
        null));
  }
  DistCpUtils.publish(jobContext.getConfiguration(),
                      CONF_LABEL_NUM_SPLITS, splits.size());
  return splits;
}
 
源代码6 项目: hadoop   文件: DistCpOptions.java
/**
 * Add options to configuration. These will be used in the Mapper/committer
 *
 * @param conf - Configruation object to which the options need to be added
 */
public void appendToConf(Configuration conf) {
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
      String.valueOf(atomicCommit));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
      String.valueOf(ignoreFailures));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
      String.valueOf(syncFolder));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
      String.valueOf(deleteMissing));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
      String.valueOf(overwrite));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
      String.valueOf(append));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
      String.valueOf(useDiff));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
      String.valueOf(skipCRC));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
      String.valueOf(mapBandwidth));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
      DistCpUtils.packAttributes(preserveStatus));
}
 
源代码7 项目: hadoop   文件: TestCopyMapper.java
private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
  try {
    for (Path path : pathList) {
      copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
              new CopyListingFileStatus(fs.getFileStatus(path)), context);
    }

    Assert.assertEquals(nFiles,
            context.getCounter(CopyMapper.Counter.SKIP).getValue());
  }
  catch (Exception exception) {
    Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
            false);
  }
}
 
源代码8 项目: big-c   文件: SimpleCopyListing.java
private void writeToFileListing(SequenceFile.Writer fileListWriter,
                                CopyListingFileStatus fileStatus,
                                Path sourcePathRoot,
                                DistCpOptions options) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
      fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
  }

  FileStatus status = fileStatus;

  if (!shouldCopy(fileStatus.getPath(), options)) {
    return;
  }

  fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
      fileStatus.getPath())), status);
  fileListWriter.sync();

  if (!fileStatus.isDirectory()) {
    totalBytesToCopy += fileStatus.getLen();
  }
  totalPaths++;
}
 
源代码9 项目: big-c   文件: RetriableFileCopyCommand.java
private void compareCheckSums(FileSystem sourceFS, Path source,
    FileChecksum sourceChecksum, FileSystem targetFS, Path target)
    throws IOException {
  if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
      targetFS, target)) {
    StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
        .append(source).append(" and ").append(target).append(".");
    if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
      errorMessage.append(" Source and target differ in block-size.")
          .append(" Use -pb to preserve block-sizes during copy.")
          .append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
				.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)");
    }
    throw new IOException(errorMessage.toString());
  }
}
 
源代码10 项目: big-c   文件: CopyMapper.java
/**
 * Implementation of the Mapper::setup() method. This extracts the DistCp-
 * options specified in the Job's configuration, to set up the Job.
 * @param context Mapper's context.
 * @throws IOException On IO failure.
 * @throws InterruptedException If the job is interrupted.
 */
@Override
public void setup(Context context) throws IOException, InterruptedException {
  conf = context.getConfiguration();

  syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
  ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
  skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
  overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
  append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
  preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
      PRESERVE_STATUS.getConfigLabel()));

  targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
  Path targetFinalPath = new Path(conf.get(
          DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
  targetFS = targetFinalPath.getFileSystem(conf);

  if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
    overWrite = true; // When target is an existing file, overwrite it.
  }

  if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
    initializeSSLConf(context);
  }
}
 
源代码11 项目: big-c   文件: CopyMapper.java
private boolean canSkip(FileSystem sourceFS, FileStatus source, 
    FileStatus target) throws IOException {
  if (!syncFolders) {
    return true;
  }
  boolean sameLength = target.getLen() == source.getLen();
  boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
      || !preserve.contains(FileAttribute.BLOCKSIZE);
  if (sameLength && sameBlockSize) {
    return skipCrc ||
        DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
            targetFS, target.getPath());
  } else {
    return false;
  }
}
 
源代码12 项目: big-c   文件: DynamicInputFormat.java
private List<InputSplit> createSplits(JobContext jobContext,
                                      List<DynamicInputChunk> chunks)
        throws IOException {
  int numMaps = getNumMapTasks(jobContext.getConfiguration());

  final int nSplits = Math.min(numMaps, chunks.size());
  List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
  
  for (int i=0; i< nSplits; ++i) {
    TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
    chunks.get(i).assignTo(taskId);
    splits.add(new FileSplit(chunks.get(i).getPath(), 0,
        // Setting non-zero length for FileSplit size, to avoid a possible
        // future when 0-sized file-splits are considered "empty" and skipped
        // over.
        getMinRecordsPerChunk(jobContext.getConfiguration()),
        null));
  }
  DistCpUtils.publish(jobContext.getConfiguration(),
                      CONF_LABEL_NUM_SPLITS, splits.size());
  return splits;
}
 
源代码13 项目: big-c   文件: DistCpOptions.java
/**
 * Add options to configuration. These will be used in the Mapper/committer
 *
 * @param conf - Configruation object to which the options need to be added
 */
public void appendToConf(Configuration conf) {
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
      String.valueOf(atomicCommit));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
      String.valueOf(ignoreFailures));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
      String.valueOf(syncFolder));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
      String.valueOf(deleteMissing));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
      String.valueOf(overwrite));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
      String.valueOf(append));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
      String.valueOf(useDiff));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
      String.valueOf(skipCRC));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
      String.valueOf(mapBandwidth));
  DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
      DistCpUtils.packAttributes(preserveStatus));
}
 
源代码14 项目: big-c   文件: TestCopyMapper.java
private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
  try {
    for (Path path : pathList) {
      copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
              new CopyListingFileStatus(fs.getFileStatus(path)), context);
    }

    Assert.assertEquals(nFiles,
            context.getCounter(CopyMapper.Counter.SKIP).getValue());
  }
  catch (Exception exception) {
    Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
            false);
  }
}
 
@Override
public CopyListingFileStatus apply(FileStatus fileStatus) {
  try {
    return DistCpUtils.toCopyListingFileStatus(fileSystem, fileStatus, preserveAcls, preserveXAttrs,
        preserveRawXAttrs);
  } catch (IOException e) {
    throw new CircusTrainException("Error transforming to CopyListingFileStatus: " + fileStatus, e);
  }
}
 
源代码16 项目: hadoop   文件: SimpleCopyListing.java
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
                                       FileStatus sourceStatus,
                                       Path sourcePathRoot,
                                       DistCpOptions options)
                                       throws IOException {
  FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
  final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
  final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
  final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
  Stack<FileStatus> pathStack = new Stack<FileStatus>();
  pathStack.push(sourceStatus);

  while (!pathStack.isEmpty()) {
    for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
      if (LOG.isDebugEnabled())
        LOG.debug("Recording source-path: "
                  + sourceStatus.getPath() + " for copy.");
      CopyListingFileStatus childCopyListingStatus =
        DistCpUtils.toCopyListingFileStatus(sourceFS, child,
          preserveAcls && child.isDirectory(),
          preserveXAttrs && child.isDirectory(),
          preserveRawXattrs && child.isDirectory());
      writeToFileListing(fileListWriter, childCopyListingStatus,
           sourcePathRoot, options);
      if (isDirectoryAndNotEmpty(sourceFS, child)) {
        if (LOG.isDebugEnabled())
          LOG.debug("Traversing non-empty source dir: "
                     + sourceStatus.getPath());
        pathStack.push(child);
      }
    }
  }
}
 
源代码17 项目: hadoop   文件: DistCp.java
/**
 * Create Job object for submitting it, with all the configuration
 *
 * @return Reference to job object.
 * @throws IOException - Exception if any
 */
private Job createJob() throws IOException {
  String jobName = "distcp";
  String userChosenName = getConf().get(JobContext.JOB_NAME);
  if (userChosenName != null)
    jobName += ": " + userChosenName;
  Job job = Job.getInstance(getConf());
  job.setJobName(jobName);
  job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
  job.setJarByClass(CopyMapper.class);
  configureOutputFormat(job);

  job.setMapperClass(CopyMapper.class);
  job.setNumReduceTasks(0);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputFormatClass(CopyOutputFormat.class);
  job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
  job.getConfiguration().set(JobContext.NUM_MAPS,
                String.valueOf(inputOptions.getMaxMaps()));

  if (inputOptions.getSslConfigurationFile() != null) {
    setupSSLConfig(job);
  }

  inputOptions.appendToConf(job.getConfiguration());
  return job;
}
 
源代码18 项目: hadoop   文件: RetriableFileCopyCommand.java
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
                                 FileStatus sourceFileStatus) {
  StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
              .format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
  message.append("% ")
          .append(description).append(" [")
          .append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
          .append('/')
      .append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
          .append(']');
  context.setStatus(message.toString());
}
 
源代码19 项目: hadoop   文件: UniformSizeInputFormat.java
/**
 * Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
 * such that the number of bytes to be copied for all the splits are
 * approximately equal.
 * @param context JobContext for the job.
 * @return The list of uniformly-distributed input-splits.
 * @throws IOException
 * @throws InterruptedException
 */
@Override
public List<InputSplit> getSplits(JobContext context)
                    throws IOException, InterruptedException {
  Configuration configuration = context.getConfiguration();
  int numSplits = DistCpUtils.getInt(configuration,
                                     JobContext.NUM_MAPS);

  if (numSplits == 0) return new ArrayList<InputSplit>();

  return getSplits(configuration, numSplits,
                   DistCpUtils.getLong(configuration,
                        DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
 
源代码20 项目: hadoop   文件: DynamicInputChunk.java
private void openForRead(TaskAttemptContext taskAttemptContext)
        throws IOException, InterruptedException {
  reader = new SequenceFileRecordReader<K, V>();
  reader.initialize(new FileSplit(chunkFilePath, 0,
          DistCpUtils.getFileSize(chunkFilePath, configuration), null),
          taskAttemptContext);
}
 
源代码21 项目: hadoop   文件: TestCopyMapper.java
@Test
public void testCopyWithAppend() throws Exception {
  final FileSystem fs = cluster.getFileSystem();
  // do the first distcp
  testCopy(false);
  // start appending data to source
  appendSourceData();

  // do the distcp again with -update and -append option
  CopyMapper copyMapper = new CopyMapper();
  StubContext stubContext = new StubContext(getConfiguration(), null, 0);
  Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
      stubContext.getContext();
  // Enable append 
  context.getConfiguration().setBoolean(
      DistCpOptionSwitch.APPEND.getConfigLabel(), true);
  copyMapper.setup(context);
  for (Path path: pathList) {
    copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
            new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
                path)), context);
  }

  verifyCopy(fs, false);
  // verify that we only copied new appended data
  Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
      .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
      .getValue());
  Assert.assertEquals(pathList.size(), stubContext.getReporter().
      getCounter(CopyMapper.Counter.COPY).getValue());
}
 
源代码22 项目: hadoop   文件: TestCopyMapper.java
@Test(timeout=40000)
public void testMakeDirFailure() {
  try {
    deleteState();
    createSourceData();

    FileSystem fs = cluster.getFileSystem();
    CopyMapper copyMapper = new CopyMapper();
    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
            = stubContext.getContext();

    Configuration configuration = context.getConfiguration();
    String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
            .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
    configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
            workPath);
    copyMapper.setup(context);

    copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
            new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);

    Assert.assertTrue("There should have been an exception.", false);
  }
  catch (Exception ignore) {
  }
}
 
源代码23 项目: hadoop   文件: TestCopyMapper.java
@Test(timeout=40000)
public void testCopyFailOnBlockSizeDifference() {
  try {

    deleteState();
    createSourceDataWithDifferentBlockSize();

    FileSystem fs = cluster.getFileSystem();
    CopyMapper copyMapper = new CopyMapper();
    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
        = stubContext.getContext();

    Configuration configuration = context.getConfiguration();
    EnumSet<DistCpOptions.FileAttribute> fileAttributes
        = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
    configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
        DistCpUtils.packAttributes(fileAttributes));

    copyMapper.setup(context);

    for (Path path : pathList) {
      final FileStatus fileStatus = fs.getFileStatus(path);
      copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
          new CopyListingFileStatus(fileStatus), context);
    }

    Assert.fail("Copy should have failed because of block-size difference.");
  }
  catch (Exception exception) {
    // Check that the exception suggests the use of -pb/-skipCrc.
    Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getCause().getMessage().contains("pb"));
    Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getCause().getMessage().contains("skipCrc"));
  }
}
 
源代码24 项目: big-c   文件: SimpleCopyListing.java
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
                                       FileStatus sourceStatus,
                                       Path sourcePathRoot,
                                       DistCpOptions options)
                                       throws IOException {
  FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
  final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
  final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
  final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
  Stack<FileStatus> pathStack = new Stack<FileStatus>();
  pathStack.push(sourceStatus);

  while (!pathStack.isEmpty()) {
    for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
      if (LOG.isDebugEnabled())
        LOG.debug("Recording source-path: "
                  + sourceStatus.getPath() + " for copy.");
      CopyListingFileStatus childCopyListingStatus =
        DistCpUtils.toCopyListingFileStatus(sourceFS, child,
          preserveAcls && child.isDirectory(),
          preserveXAttrs && child.isDirectory(),
          preserveRawXattrs && child.isDirectory());
      writeToFileListing(fileListWriter, childCopyListingStatus,
           sourcePathRoot, options);
      if (isDirectoryAndNotEmpty(sourceFS, child)) {
        if (LOG.isDebugEnabled())
          LOG.debug("Traversing non-empty source dir: "
                     + sourceStatus.getPath());
        pathStack.push(child);
      }
    }
  }
}
 
源代码25 项目: big-c   文件: DistCp.java
/**
 * Create Job object for submitting it, with all the configuration
 *
 * @return Reference to job object.
 * @throws IOException - Exception if any
 */
private Job createJob() throws IOException {
  String jobName = "distcp";
  String userChosenName = getConf().get(JobContext.JOB_NAME);
  if (userChosenName != null)
    jobName += ": " + userChosenName;
  Job job = Job.getInstance(getConf());
  job.setJobName(jobName);
  job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
  job.setJarByClass(CopyMapper.class);
  configureOutputFormat(job);

  job.setMapperClass(CopyMapper.class);
  job.setNumReduceTasks(0);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputFormatClass(CopyOutputFormat.class);
  job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
  job.getConfiguration().set(JobContext.NUM_MAPS,
                String.valueOf(inputOptions.getMaxMaps()));

  if (inputOptions.getSslConfigurationFile() != null) {
    setupSSLConfig(job);
  }

  inputOptions.appendToConf(job.getConfiguration());
  return job;
}
 
源代码26 项目: big-c   文件: RetriableFileCopyCommand.java
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
                                 FileStatus sourceFileStatus) {
  StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
              .format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
  message.append("% ")
          .append(description).append(" [")
          .append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
          .append('/')
      .append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
          .append(']');
  context.setStatus(message.toString());
}
 
源代码27 项目: big-c   文件: UniformSizeInputFormat.java
/**
 * Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
 * such that the number of bytes to be copied for all the splits are
 * approximately equal.
 * @param context JobContext for the job.
 * @return The list of uniformly-distributed input-splits.
 * @throws IOException
 * @throws InterruptedException
 */
@Override
public List<InputSplit> getSplits(JobContext context)
                    throws IOException, InterruptedException {
  Configuration configuration = context.getConfiguration();
  int numSplits = DistCpUtils.getInt(configuration,
                                     JobContext.NUM_MAPS);

  if (numSplits == 0) return new ArrayList<InputSplit>();

  return getSplits(configuration, numSplits,
                   DistCpUtils.getLong(configuration,
                        DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
 
源代码28 项目: big-c   文件: DynamicInputChunk.java
private void openForRead(TaskAttemptContext taskAttemptContext)
        throws IOException, InterruptedException {
  reader = new SequenceFileRecordReader<K, V>();
  reader.initialize(new FileSplit(chunkFilePath, 0,
          DistCpUtils.getFileSize(chunkFilePath, configuration), null),
          taskAttemptContext);
}
 
源代码29 项目: big-c   文件: TestCopyMapper.java
@Test
public void testCopyWithAppend() throws Exception {
  final FileSystem fs = cluster.getFileSystem();
  // do the first distcp
  testCopy(false);
  // start appending data to source
  appendSourceData();

  // do the distcp again with -update and -append option
  CopyMapper copyMapper = new CopyMapper();
  StubContext stubContext = new StubContext(getConfiguration(), null, 0);
  Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
      stubContext.getContext();
  // Enable append 
  context.getConfiguration().setBoolean(
      DistCpOptionSwitch.APPEND.getConfigLabel(), true);
  copyMapper.setup(context);
  for (Path path: pathList) {
    copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
            new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
                path)), context);
  }

  verifyCopy(fs, false);
  // verify that we only copied new appended data
  Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
      .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
      .getValue());
  Assert.assertEquals(pathList.size(), stubContext.getReporter().
      getCounter(CopyMapper.Counter.COPY).getValue());
}
 
源代码30 项目: big-c   文件: TestCopyMapper.java
@Test(timeout=40000)
public void testMakeDirFailure() {
  try {
    deleteState();
    createSourceData();

    FileSystem fs = cluster.getFileSystem();
    CopyMapper copyMapper = new CopyMapper();
    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
            = stubContext.getContext();

    Configuration configuration = context.getConfiguration();
    String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
            .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
    configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
            workPath);
    copyMapper.setup(context);

    copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
            new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);

    Assert.assertTrue("There should have been an exception.", false);
  }
  catch (Exception ignore) {
  }
}