public void testUndeploy() throws Exception {
    CelosCiContext context = mock(CelosCiContext.class);

    File remoteHdfsFolder = tempDir.newFolder();

    doReturn(LocalFileSystem.get(new Configuration())).when(context).getFileSystem();
    File remoteDir = new File(remoteHdfsFolder, "some/hdfs/root/workflow");
    new File(remoteDir, "file").createNewFile();

    HdfsDeployer deployer = new HdfsDeployer(context);

private void setUpSchedulerConfigFile(Properties schedulerConfProps)
    throws IOException {
  LocalFileSystem fs = FileSystem.getLocal(new Configuration());

  String myResourcePath = System.getProperty("test.build.data");
  Path schedulerConfigFilePath =
      new Path(myResourcePath, CapacitySchedulerConf.SCHEDULER_CONF_FILE);
  OutputStream out = fs.create(schedulerConfigFilePath);

  Configuration config = new Configuration(false);
  for (Enumeration<?> e = schedulerConfProps.propertyNames(); e
      .hasMoreElements();) {
    String key = (String) e.nextElement();
    LOG.debug("Adding " + key + schedulerConfProps.getProperty(key));
    config.set(key, schedulerConfProps.getProperty(key));


  LOG.info("setting resource path where capacity-scheduler's config file "
      + "is placed to " + myResourcePath);
  System.setProperty(MY_SCHEDULER_CONF_PATH_PROPERTY, myResourcePath);
/** Same as above but create a reader. */
public void testIFileReaderWithCodec() throws Exception {
  Configuration conf = new Configuration();
  FileSystem localFs = FileSystem.getLocal(conf);
  FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
  Path path = new Path(new Path("build/test.ifile"), "data");
  DefaultCodec codec = new GzipCodec();
  FSDataOutputStream out = rfs.create(path);
  IFile.Writer<Text, Text> writer =
      new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
                                   codec, null);
  FSDataInputStream in = rfs.open(path);
  IFile.Reader<Text, Text> reader =
    new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
        codec, null);
  // test check sum 
  byte[] ab= new byte[100];
  int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
  assertEquals( readed,reader.checksumIn.getChecksum().length);
 * Simulate the {@link DFSConfigKeys#DFS_DATANODE_DATA_DIR_KEY} of a 
 * populated DFS filesystem.
 * This method populates for each parent directory, <code>parent/dirName</code>
 * with the content of block pool storage directory that comes from a singleton
 * datanode master (that contains version and block files). If the destination
 * directory does not exist, it will be created.  If the directory already 
 * exists, it will first be deleted.
 * @param parents parent directory where {@code dirName} is created
 * @param dirName directory under which storage directory is created
 * @param bpid block pool id for which the storage directory is created.
 * @return the array of created directories
public static File[] createBlockPoolStorageDirs(String[] parents,
    String dirName, String bpid) throws Exception {
  File[] retVal = new File[parents.length];
  Path bpCurDir = new Path(MiniDFSCluster.getBPDir(datanodeStorage,
      bpid, Storage.STORAGE_DIR_CURRENT));
  for (int i = 0; i < parents.length; i++) {
    File newDir = new File(parents[i] + "/current/" + bpid, dirName);
    createEmptyDirs(new String[] {newDir.toString()});
    LocalFileSystem localFS = FileSystem.getLocal(new HdfsConfiguration());
                            new Path(newDir.toString()),
    retVal[i] = newDir;
  return retVal;
public void testCreationWithConfigurationFSImpl() throws Exception {
  SharedResourcesBrokerImpl<SimpleScopeType> broker = SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(
      ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance());

  Configuration conf = new Configuration();
  conf.set("fs.local.impl", InstrumentedLocalFileSystem.class.getName());

  FileSystemKey key = new FileSystemKey(new URI("file:///"), new Configuration());
  FileSystemFactory<SimpleScopeType> factory = new FileSystemFactory<>();

  FileSystem fs =  broker.getSharedResource(factory, key);

  Assert.assertTrue(DecoratorUtils.resolveUnderlyingObject(fs) instanceof LocalFileSystem);
private void _mkdirs(boolean exists, FsPermission before, FsPermission after)
    throws Throwable {
  File localDir = make(stub(File.class).returning(exists).from.exists());
  Path dir = mock(Path.class); // use default stubs
  LocalFileSystem fs = make(stub(LocalFileSystem.class)
  FileStatus stat = make(stub(FileStatus.class)

  try {
    DiskChecker.mkdirsWithExistsAndPermissionCheck(fs, dir, before);

    if (!exists)
      verify(fs).setPermission(dir, before);
    else {
  catch (DiskErrorException e) {
    if (before != after)
      assertTrue(e.getMessage().startsWith("Incorrect permission"));
public HadoopFileSystem create( NamedCluster namedCluster, URI uri ) throws IOException {
  final Configuration configuration = hadoopShim.createConfiguration( namedCluster );
  FileSystem fileSystem = (FileSystem) hadoopShim.getFileSystem( configuration ).getDelegate();
  if ( fileSystem instanceof LocalFileSystem ) {
    LOGGER.error( "Got a local filesystem, was expecting an hdfs connection" );
    throw new IOException( "Got a local filesystem, was expecting an hdfs connection" );

  final URI finalUri = fileSystem.getUri() != null ? fileSystem.getUri() : uri;
  HadoopFileSystem hadoopFileSystem = new HadoopFileSystemImpl( () -> {
    try {
      return finalUri != null
        ? (FileSystem) hadoopShim.getFileSystem( finalUri, configuration, (NamedCluster) namedCluster ).getDelegate()
        : (FileSystem) hadoopShim.getFileSystem( configuration ).getDelegate();
    } catch ( IOException | InterruptedException e ) {
      LOGGER.debug( "Error looking up/creating the file system ", e );
      return null;
  } );
  ( (HadoopFileSystemImpl) hadoopFileSystem ).setNamedCluster( namedCluster );

  return hadoopFileSystem;
@Test(timeout = 5000l)
public void testFailResultCodes() throws Exception {
  Configuration conf = new YarnConfiguration();
  conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class);
  LogCLIHelpers cliHelper = new LogCLIHelpers();
  YarnClient mockYarnClient = createMockYarnClient(YarnApplicationState.FINISHED);
  LogsCLI dumper = new LogsCLIForTest(mockYarnClient);
  // verify dumping a non-existent application's logs returns a failure code
  int exitCode = dumper.run( new String[] {
      "-applicationId", "application_0_0" } );
  assertTrue("Should return an error code", exitCode != 0);
  // verify dumping a non-existent container log is a failure code 
  exitCode = cliHelper.dumpAContainersLogs("application_0_0", "container_0_0",
      "nonexistentnode:1234", "nobody");
  assertTrue("Should return an error code", exitCode != 0);
private void copyPartitions(Path mapOutputPath, Path indexPath)
  throws IOException {
  FileSystem localFs = FileSystem.getLocal(jobConf);
  FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
  FSDataOutputStream rawOutput = rfs.create(mapOutputPath, true, BUF_SIZE);
  SpillRecord spillRecord = new SpillRecord(numberOfPartitions);
  IndexRecord indexRecord = new IndexRecord();
  for (int i = 0; i < numberOfPartitions; i++) {
    indexRecord.startOffset = rawOutput.getPos();
    byte buffer[] = outStreams[i].toByteArray();
    IFileOutputStream checksumOutput = new IFileOutputStream(rawOutput);
    // Write checksum.
    // Write index record
    indexRecord.rawLength = (long)buffer.length;
    indexRecord.partLength = rawOutput.getPos() - indexRecord.startOffset;
    spillRecord.putIndex(indexRecord, i);
  spillRecord.writeToFile(indexPath, jobConf);
/** Same as above but create a reader. */
public void testIFileReaderWithCodec() throws Exception {
  Configuration conf = new Configuration();
  FileSystem localFs = FileSystem.getLocal(conf);
  FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
  Path path = new Path(new Path("build/test.ifile"), "data");
  DefaultCodec codec = new GzipCodec();
  FSDataOutputStream out = rfs.create(path);
  IFile.Writer<Text, Text> writer =
      new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
                                   codec, null);
  FSDataInputStream in = rfs.open(path);
  IFile.Reader<Text, Text> reader =
    new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
        codec, null);
  // test check sum 
  byte[] ab= new byte[100];
  int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
  assertEquals( readed,reader.checksumIn.getChecksum().length);
@Test (timeout = 30000)
public void testDataDirValidation() throws Throwable {
  DataNodeDiskChecker diskChecker = mock(DataNodeDiskChecker.class);
  doThrow(new IOException()).doThrow(new IOException()).doNothing()
    .when(diskChecker).checkDir(any(LocalFileSystem.class), any(Path.class));
  LocalFileSystem fs = mock(LocalFileSystem.class);
  AbstractList<StorageLocation> locations = new ArrayList<StorageLocation>();


  List<StorageLocation> checkedLocations =
      DataNode.checkStorageLocations(locations, fs, diskChecker);
  assertEquals("number of valid data dirs", 1, checkedLocations.size());
  String validDir = checkedLocations.iterator().next().getFile().getPath();
  assertThat("p3 should be valid", new File("/p3/").getPath(), is(validDir));
 * Simulate the {@link DFSConfigKeys#DFS_DATANODE_DATA_DIR_KEY} of a 
 * populated DFS filesystem.
 * This method populates for each parent directory, <code>parent/dirName</code>
 * with the content of block pool storage directory that comes from a singleton
 * datanode master (that contains version and block files). If the destination
 * directory does not exist, it will be created.  If the directory already 
 * exists, it will first be deleted.
 * @param parents parent directory where {@code dirName} is created
 * @param dirName directory under which storage directory is created
 * @param bpid block pool id for which the storage directory is created.
 * @return the array of created directories
public static File[] createBlockPoolStorageDirs(String[] parents,
    String dirName, String bpid) throws Exception {
  File[] retVal = new File[parents.length];
  Path bpCurDir = new Path(MiniDFSCluster.getBPDir(datanodeStorage,
      bpid, Storage.STORAGE_DIR_CURRENT));
  for (int i = 0; i < parents.length; i++) {
    File newDir = new File(parents[i] + "/current/" + bpid, dirName);
    createEmptyDirs(new String[] {newDir.toString()});
    LocalFileSystem localFS = FileSystem.getLocal(new HdfsConfiguration());
                            new Path(newDir.toString()),
    retVal[i] = newDir;
  return retVal;
private void testFileCorruption(LocalFileSystem fileSys) throws IOException {
  // create a file and verify that checksum corruption results in 
  // a checksum exception on LocalFS
  String dir = System.getProperty("test.build.data", ".");
  Path file = new Path(dir + "/corruption-test.dat");
  Path crcFile = new Path(dir + "/.corruption-test.dat.crc");
  writeFile(fileSys, file);
  int fileLen = (int)fileSys.getFileStatus(file).getLen();
  byte [] buf = new byte[fileLen];

  InputStream in = fileSys.open(file);
  IOUtils.readFully(in, buf, 0, buf.length);
  // check .crc corruption
  checkFileCorruption(fileSys, file, crcFile);
  fileSys.delete(file, true);
  writeFile(fileSys, file);
  // check data corrutpion
  checkFileCorruption(fileSys, file, file);
  fileSys.delete(file, true);
public static void writeTraceToHDFS(String trace, String fname) {
	try {
		HDFSTool.writeStringToHDFS(trace, fname);
		FileSystem fs = IOUtilFunctions.getFileSystem(fname);
		if (fs instanceof LocalFileSystem) {
			Path path = new Path(fname);
			IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
	} catch (IOException e) {
		throw new DMLRuntimeException(e);
public static void copyFromLocal(String localPath, String destination) {
    try {
        JobConf hadoopConfig = HdpBootstrap.hadoopConfig();
        FileSystem fs = FileSystem.get(hadoopConfig);
        if (!(fs instanceof LocalFileSystem)) {
            Path src = new Path(localPath);
            Path dst = new Path(destination);
            fs.copyFromLocalFile(false, true, src, dst);
            System.out.println(String.format("Copying [%s] to [%s]", src, dst));
    } catch (Exception ex) {
        throw new RuntimeException(ex);
public FSFactory(ParaflowPrestoConfig prestoConfig)
    config.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
    config.set("fs.file.impl", LocalFileSystem.class.getName());
    try {
        this.fileSystem = FileSystem.get(new URI(prestoConfig.getHDFSWarehouse()), config);
    catch (IOException | URISyntaxException e) {
        this.fileSystem = null;
private void setFileSystem(Configuration conf) throws IOException {
  Configuration confCopy = new Configuration(conf);
  confCopy.setBoolean("dfs.client.retry.policy.enabled", true);
  String retryPolicy =
  confCopy.set("dfs.client.retry.policy.spec", retryPolicy);
  fs = fsWorkingPath.getFileSystem(confCopy);
  // if it's local file system, use RawLocalFileSystem instead of
  // LocalFileSystem, the latter one doesn't support append.
  if (fs.getScheme().equals("file")) {
    fs = ((LocalFileSystem)fs).getRaw();
public static void tearDown() throws IOException {
    Path testDir = new Path(hdfsBaseDir.getParent());
    hdfs.delete(testDir, true);
    LocalFileSystem localFileSystem = FileSystem.getLocal(conf);
    localFileSystem.delete(testDir, true);
@Test(expected = IllegalStateException.class)
public void testDeployThrowsExceptionNoDir() throws Exception {
    CelosCiContext context = mock(CelosCiContext.class);
    HdfsDeployer deployer = new HdfsDeployer(context);

    doReturn(LocalFileSystem.get(new Configuration())).when(context).getFileSystem();
    doReturn(new File("nodir" + UUID.randomUUID())).when(context).getDeployDir();

public static File[] createStorageDirs(NodeType nodeType, String[] parents, String dirName,
    File srcFile) throws Exception {
  File[] retVal = new File[parents.length];
  for (int i = 0; i < parents.length; i++) {
    File newDir = new File(parents[i], dirName);
    createEmptyDirs(new String[] {newDir.toString()});
    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
    switch (nodeType) {
    case NAME_NODE:
      localFS.copyToLocalFile(new Path(srcFile.toString(), "current"),
                              new Path(newDir.toString()),
      Path newImgDir = new Path(newDir.getParent(), "image");
      if (!localFS.exists(newImgDir))
            new Path(srcFile.toString(), "image"),
    case DATA_NODE:
      localFS.copyToLocalFile(new Path(srcFile.toString(), "current"),
                              new Path(newDir.toString()),
      Path newStorageFile = new Path(newDir.getParent(), "storage");
      if (!localFS.exists(newStorageFile))
            new Path(srcFile.toString(), "storage"),
    retVal[i] = newDir;
  return retVal;
 * Renames a src {@link Path} on fs {@link FileSystem} to a dst {@link Path}. If fs is a {@link LocalFileSystem} and
 * src is a directory then {@link File#renameTo} is called directly to avoid a directory rename race condition where
 * {@link org.apache.hadoop.fs.RawLocalFileSystem#rename} copies the conflicting src directory into dst resulting in
 * an extra nested level, such as /root/a/b/c/e/e where e is repeated.
 * @param fs the {@link FileSystem} where the src {@link Path} exists
 * @param src the source {@link Path} which will be renamed
 * @param dst the {@link Path} to rename to
 * @return true if rename succeeded, false if rename failed.
 * @throws IOException if rename failed for reasons other than target exists.
public static boolean renamePathHandleLocalFSRace(FileSystem fs, Path src, Path dst) throws IOException {
  if (DecoratorUtils.resolveUnderlyingObject(fs) instanceof LocalFileSystem && fs.isDirectory(src)) {
    LocalFileSystem localFs = (LocalFileSystem) DecoratorUtils.resolveUnderlyingObject(fs);
    File srcFile = localFs.pathToFile(src);
    File dstFile = localFs.pathToFile(dst);

    return srcFile.renameTo(dstFile);
  else {
    return fs.rename(src, dst);
public LocalFetcher(TajoConf conf, URI uri, String tableName) throws IOException {
  super(conf, uri);
  this.maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
  this.tableName = tableName;
  this.localFileSystem = new LocalFileSystem();
  this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
  String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
  this.host = uri.getHost() == null ? "localhost" : uri.getHost();
  this.port = uri.getPort();
  if (port == -1) {
    if (scheme.equalsIgnoreCase("http")) {
      this.port = 80;
    } else if (scheme.equalsIgnoreCase("https")) {
      this.port = 443;

  bootstrap = new Bootstrap()
      .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
          conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
      .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
      .option(ChannelOption.TCP_NODELAY, true);
public void testCreationWithInstrumentedScheme() throws Exception {
  SharedResourcesBrokerImpl<SimpleScopeType> broker = SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(
      ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance());

  FileSystemKey key = new FileSystemKey(new URI("instrumented-file:///"), new Configuration());
  FileSystemFactory<SimpleScopeType> factory = new FileSystemFactory<>();

  FileSystem fs =  broker.getSharedResource(factory, key);

  Assert.assertTrue(DecoratorUtils.resolveUnderlyingObject(fs) instanceof LocalFileSystem);
 * Get the path to a local file
 * @return File representing the local path
 * @throws IllegalArgumentException if this.fs is not the LocalFileSystem
public File toFile() {
  if (!(fs instanceof LocalFileSystem)) {
     throw new IllegalArgumentException("Not a local path: " + path);
  return ((LocalFileSystem)fs).pathToFile(path);
protected void doOpen(Configuration conf,
  Path dstPath, FileSystem hdfs) throws
  IOException {
  if(useRawLocalFileSystem) {
    if(hdfs instanceof LocalFileSystem) {
      hdfs = ((LocalFileSystem)hdfs).getRaw();
    } else {
      logger.warn("useRawLocalFileSystem is set to true but file system " +
  boolean appending = false;
  if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
          (dstPath)) {
    outStream = hdfs.append(dstPath);
    appending = true;
  } else {
    outStream = hdfs.create(dstPath);

  serializer = EventSerializerFactory.getInstance(
      serializerType, serializerContext, outStream);
  if (appending && !serializer.supportsReopen()) {
    serializer = null;
    throw new IOException("serializer (" + serializerType +
  // must call superclass to check for replication issues
  registerCurrentStream(outStream, hdfs, dstPath);

  if (appending) {
  } else {
public void testFSInputChecker() throws Exception {
  Configuration conf = new Configuration();
  conf.setLong("dfs.block.size", BLOCK_SIZE);
  conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);

  // test DFS
  MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem();
  try {
    testChecker(fileSys, true);
    testChecker(fileSys, false);
  } finally {
  // test Local FS
  fileSys = FileSystem.getLocal(conf);
  try {
    testChecker(fileSys, true);
    testChecker(fileSys, false);
  }finally {
private void setFileSystem(Configuration conf) throws IOException {
  Configuration confCopy = new Configuration(conf);
  confCopy.setBoolean("dfs.client.retry.policy.enabled", true);
  String retryPolicy =
  confCopy.set("dfs.client.retry.policy.spec", retryPolicy);
  fs = fsWorkingPath.getFileSystem(confCopy);
  // if it's local file system, use RawLocalFileSystem instead of
  // LocalFileSystem, the latter one doesn't support append.
  if (fs.getScheme().equals("file")) {
    fs = ((LocalFileSystem)fs).getRaw();
private static void copyPath(FileSystem srcFs, Path src, FileSystem dstFs, Path dst, boolean deleteSource,
    boolean overwrite, Configuration conf) throws IOException {

      String.format("Cannot copy from %s to %s because src does not exist", src, dst));
  Preconditions.checkArgument(overwrite || !dstFs.exists(dst),
      String.format("Cannot copy from %s to %s because dst exists", src, dst));

  try {
    boolean isSourceFileSystemLocal = srcFs instanceof LocalFileSystem || srcFs instanceof RawLocalFileSystem;
    if (isSourceFileSystemLocal) {
      try {
        dstFs.copyFromLocalFile(deleteSource, overwrite, src, dst);
      } catch (IOException e) {
        throw new IOException(String.format("Failed to copy %s to %s", src, dst), e);
    } else if (!FileUtil.copy(srcFs, src, dstFs, dst, deleteSource, overwrite, conf)) {
      throw new IOException(String.format("Failed to copy %s to %s", src, dst));
  } catch (Throwable t1) {
    try {
      deleteIfExists(dstFs, dst, true);
    } catch (Throwable t2) {
      // Do nothing
private static String getFSIdentifier(URI uri) {
  if (new LocalFileSystem().getScheme().equals(uri.getScheme())) {
    return "localhost";
  } else {
    return ClustersNames.getInstance().getClusterName(uri.toString());
public void before() throws Exception {
  // Make sure testDir is on LocalFileSystem
  this.fs = FileSystem.getLocal(this.testUtil.getConfiguration());
  this.testDir = TestExportSnapshotV1NoCluster.setup(this.fs, this.testUtil);
  LOG.info("fs={}, testDir={}", this.fs, this.testDir);
  assertTrue("FileSystem '" + fs + "' is not local", fs instanceof LocalFileSystem);