类org.apache.hadoop.fs.swift.util.SwiftUtils源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.swift.util.SwiftUtils的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: SwiftNativeOutputStream.java

@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
  long uploadLen = backupFile.length();
  SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
                        " localfile=%s of length %d  - attempt %d",
                   partNumber,
                   key,
                   backupFile,
                   uploadLen,
                   attempt);
  nativeStore.uploadFilePart(new Path(key),
                             partNumber,
                             new FileInputStream(backupFile),
                             uploadLen);
  return uploadLen;
}
 
源代码2 项目: hadoop   文件: SwiftNativeInputStream.java

@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
  SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
  SwiftUtils.validateReadArgs(b, off, len);
  int result = -1;
  try {
    verifyOpen();
    result = httpStream.read(b, off, len);
  } catch (IOException e) {
    //other IO problems are viewed as transient and re-attempted
    LOG.info("Received IOException while reading '" + path +
             "', attempting to reopen: " + e);
    LOG.debug("IOE on read()" + e, e);
    if (reopenBuffer()) {
      result = httpStream.read(b, off, len);
    }
  }
  if (result > 0) {
    incPos(result);
    if (statistics != null) {
      statistics.incrementBytesRead(result);
    }
  }

  return result;
}
 

/**
 * Upload part of a larger file.
 *
 * @param path        destination path
 * @param partNumber  item number in the path
 * @param inputStream input data
 * @param length      length of the data
 * @throws IOException on a problem
 */
public void uploadFilePart(Path path, int partNumber,
                           InputStream inputStream, long length)
        throws IOException {

  String stringPath = path.toUri().toString();
  String partitionFilename = SwiftUtils.partitionFilenameFromNumber(
    partNumber);
  if (stringPath.endsWith("/")) {
    stringPath = stringPath.concat(partitionFilename);
  } else {
    stringPath = stringPath.concat("/").concat(partitionFilename);
  }

  swiftRestClient.upload(
    new SwiftObjectPath(toDirPath(path).getContainer(), stringPath),
          inputStream,
          length);
}
 

@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeleteSmallPartitionedFile() throws Throwable {
  final Path path = new Path("/test/testDeleteSmallPartitionedFile");

  final int len1 = 1024;
  final byte[] src1 = SwiftTestUtils.dataset(len1, 'A', 'Z');
  SwiftTestUtils.writeDataset(fs, path, src1, len1, 1024, false);
  assertExists("Exists", path);

  Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
  Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
  String ls = SwiftTestUtils.ls(fs, path);
  assertExists("Partition 0001 Exists in " + ls, part_0001);
  assertPathDoesNotExist("partition 0002 found under " + ls, part_0002);
  assertExists("Partition 0002 Exists in " + ls, part_0001);
  fs.delete(path, false);
  assertPathDoesNotExist("deleted file still there", path);
  ls = SwiftTestUtils.ls(fs, path);
  assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
}
 

@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeletePartitionedFile() throws Throwable {
  final Path path = new Path("/test/testDeletePartitionedFile");

  SwiftTestUtils.writeDataset(fs, path, data, data.length, 1024, false);
  assertExists("Exists", path);

  Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
  Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
  String ls = SwiftTestUtils.ls(fs, path);
  assertExists("Partition 0001 Exists in " + ls, part_0001);
  assertExists("Partition 0002 Exists in " + ls, part_0001);
  fs.delete(path, false);
  assertPathDoesNotExist("deleted file still there", path);
  ls = SwiftTestUtils.ls(fs, path);
  assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
  assertPathDoesNotExist("partition 0002 file still under " + ls, part_0002);
}
 
源代码6 项目: big-c   文件: SwiftNativeOutputStream.java

@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
  long uploadLen = backupFile.length();
  SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
                        " localfile=%s of length %d  - attempt %d",
                   partNumber,
                   key,
                   backupFile,
                   uploadLen,
                   attempt);
  nativeStore.uploadFilePart(new Path(key),
                             partNumber,
                             new FileInputStream(backupFile),
                             uploadLen);
  return uploadLen;
}
 
源代码7 项目: big-c   文件: SwiftNativeInputStream.java

@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
  SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
  SwiftUtils.validateReadArgs(b, off, len);
  int result = -1;
  try {
    verifyOpen();
    result = httpStream.read(b, off, len);
  } catch (IOException e) {
    //other IO problems are viewed as transient and re-attempted
    LOG.info("Received IOException while reading '" + path +
             "', attempting to reopen: " + e);
    LOG.debug("IOE on read()" + e, e);
    if (reopenBuffer()) {
      result = httpStream.read(b, off, len);
    }
  }
  if (result > 0) {
    incPos(result);
    if (statistics != null) {
      statistics.incrementBytesRead(result);
    }
  }

  return result;
}
 

/**
 * Upload part of a larger file.
 *
 * @param path        destination path
 * @param partNumber  item number in the path
 * @param inputStream input data
 * @param length      length of the data
 * @throws IOException on a problem
 */
public void uploadFilePart(Path path, int partNumber,
                           InputStream inputStream, long length)
        throws IOException {

  String stringPath = path.toUri().toString();
  String partitionFilename = SwiftUtils.partitionFilenameFromNumber(
    partNumber);
  if (stringPath.endsWith("/")) {
    stringPath = stringPath.concat(partitionFilename);
  } else {
    stringPath = stringPath.concat("/").concat(partitionFilename);
  }

  swiftRestClient.upload(
    new SwiftObjectPath(toDirPath(path).getContainer(), stringPath),
          inputStream,
          length);
}
 

@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeleteSmallPartitionedFile() throws Throwable {
  final Path path = new Path("/test/testDeleteSmallPartitionedFile");

  final int len1 = 1024;
  final byte[] src1 = SwiftTestUtils.dataset(len1, 'A', 'Z');
  SwiftTestUtils.writeDataset(fs, path, src1, len1, 1024, false);
  assertExists("Exists", path);

  Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
  Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
  String ls = SwiftTestUtils.ls(fs, path);
  assertExists("Partition 0001 Exists in " + ls, part_0001);
  assertPathDoesNotExist("partition 0002 found under " + ls, part_0002);
  assertExists("Partition 0002 Exists in " + ls, part_0001);
  fs.delete(path, false);
  assertPathDoesNotExist("deleted file still there", path);
  ls = SwiftTestUtils.ls(fs, path);
  assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
}
 

@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeletePartitionedFile() throws Throwable {
  final Path path = new Path("/test/testDeletePartitionedFile");

  SwiftTestUtils.writeDataset(fs, path, data, data.length, 1024, false);
  assertExists("Exists", path);

  Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
  Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
  String ls = SwiftTestUtils.ls(fs, path);
  assertExists("Partition 0001 Exists in " + ls, part_0001);
  assertExists("Partition 0002 Exists in " + ls, part_0001);
  fs.delete(path, false);
  assertPathDoesNotExist("deleted file still there", path);
  ls = SwiftTestUtils.ls(fs, path);
  assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
  assertPathDoesNotExist("partition 0002 file still under " + ls, part_0002);
}
 

@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
  long uploadLen = backupFile.length();
  SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
                        " localfile=%s of length %d  - attempt %d",
                   partNumber,
                   key,
                   backupFile,
                   uploadLen,
                   attempt);
  nativeStore.uploadFilePart(new Path(key),
                             partNumber,
                             new FileInputStream(backupFile),
                             uploadLen);
  return uploadLen;
}
 

@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
  SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
  SwiftUtils.validateReadArgs(b, off, len);
  int result = -1;
  try {
    verifyOpen();
    result = httpStream.read(b, off, len);
  } catch (IOException e) {
    //other IO problems are viewed as transient and re-attempted
    LOG.info("Received IOException while reading '" + path +
             "', attempting to reopen: " + e);
    LOG.debug("IOE on read()" + e, e);
    if (reopenBuffer()) {
      result = httpStream.read(b, off, len);
    }
  }
  if (result > 0) {
    incPos(result);
    if (statistics != null) {
      statistics.incrementBytesRead(result);
    }
  }

  return result;
}
 

/**
 * Upload part of a larger file.
 *
 * @param path        destination path
 * @param partNumber  item number in the path
 * @param inputStream input data
 * @param length      length of the data
 * @throws IOException on a problem
 */
public void uploadFilePart(Path path, int partNumber,
                           InputStream inputStream, long length)
        throws IOException {

  String stringPath = path.toUri().getPath();
  String partitionFilename = SwiftUtils.partitionFilenameFromNumber(
    partNumber);
  if (stringPath.endsWith("/")) {
    stringPath = stringPath.concat(partitionFilename);
  } else {
    stringPath = stringPath.concat("/").concat(partitionFilename);
  }

  swiftRestClient.upload(
    new SwiftObjectPath(toDirPath(path).getContainer(), stringPath),
          inputStream,
          length);
}
 

/**
 * deletes object from Swift
 *
 * @param status FileStatus to delete
 * @return true if the path was deleted by this specific operation.
 * @throws IOException on a failure
 */
public boolean deleteObject(FileStatus status) throws IOException {
  SwiftObjectPath swiftObjectPath;
  if (status.isDir()) {
    swiftObjectPath = toDirPath(status.getPath());
  } else {
    swiftObjectPath = toObjectPath(status.getPath());
  }
  if (!SwiftUtils.isRootDir(swiftObjectPath)) {
    return swiftRestClient.delete(swiftObjectPath);
  } else {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Not deleting root directory entry");
    }
    return true;
  }
}
 

@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeleteSmallPartitionedFile() throws Throwable {
  final Path path = new Path("/test/testDeleteSmallPartitionedFile");

  final int len1 = 1024;
  final byte[] src1 = SwiftTestUtils.dataset(len1, 'A', 'Z');
  SwiftTestUtils.writeDataset(fs, path, src1, len1, 1024, false);
  assertExists("Exists", path);

  Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
  Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
  String ls = SwiftTestUtils.ls(fs, path);
  assertExists("Partition 0001 Exists in " + ls, part_0001);
  assertPathDoesNotExist("partition 0002 found under " + ls, part_0002);
  assertExists("Partition 0002 Exists in " + ls, part_0001);
  fs.delete(path, false);
  assertPathDoesNotExist("deleted file still there", path);
  ls = SwiftTestUtils.ls(fs, path);
  assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
}
 

@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeletePartitionedFile() throws Throwable {
  final Path path = new Path("/test/testDeletePartitionedFile");

  SwiftTestUtils.writeDataset(fs, path, data, data.length, 1024, false);
  assertExists("Exists", path);

  Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
  Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
  String ls = SwiftTestUtils.ls(fs, path);
  assertExists("Partition 0001 Exists in " + ls, part_0001);
  assertExists("Partition 0002 Exists in " + ls, part_0001);
  fs.delete(path, false);
  assertPathDoesNotExist("deleted file still there", path);
  ls = SwiftTestUtils.ls(fs, path);
  assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
  assertPathDoesNotExist("partition 0002 file still under " + ls, part_0002);
}
 
源代码17 项目: hadoop   文件: SwiftRestClient.java

/**
 * Converts Swift path to URI to make request.
 * This is public for unit testing
 *
 * @param path path to object
 * @param endpointURI damain url e.g. http://domain.com
 * @return valid URI for object
 * @throws SwiftException
 */
public static URI pathToURI(SwiftObjectPath path,
                            URI endpointURI) throws SwiftException {
  checkNotNull(endpointURI, "Null Endpoint -client is not authenticated");

  String dataLocationURI = endpointURI.toString();
  try {

    dataLocationURI = SwiftUtils.joinPaths(dataLocationURI, encodeUrl(path.toUriPath()));
    return new URI(dataLocationURI);
  } catch (URISyntaxException e) {
    throw new SwiftException("Failed to create URI from " + dataLocationURI, e);
  }
}
 
源代码18 项目: hadoop   文件: SwiftNativeFileSystem.java

@Override
public boolean isFile(Path f) throws IOException {
  try {
    FileStatus fileStatus = getFileStatus(f);
    return !SwiftUtils.isDirectory(fileStatus);
  } catch (FileNotFoundException e) {
    return false;               // f does not exist
  }
}
 
源代码19 项目: hadoop   文件: SwiftNativeFileSystem.java

@Override
public boolean isDirectory(Path f) throws IOException {

  try {
    FileStatus fileStatus = getFileStatus(f);
    return SwiftUtils.isDirectory(fileStatus);
  } catch (FileNotFoundException e) {
    return false;               // f does not exist
  }
}
 
源代码20 项目: hadoop   文件: SwiftNativeOutputStream.java

@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFileAttempt(Path keypath, int attempt) throws IOException {
  long uploadLen = backupFile.length();
  SwiftUtils.debug(LOG, "Closing write of file %s;" +
                        " localfile=%s of length %d - attempt %d",
                   key,
                   backupFile,
                   uploadLen,
                   attempt);

  nativeStore.uploadFile(keypath,
                         new FileInputStream(backupFile),
                         uploadLen);
  return uploadLen;
}
 
源代码21 项目: hadoop   文件: SwiftNativeOutputStream.java

private void delete(File file) {
  if (file != null) {
    SwiftUtils.debug(LOG, "deleting %s", file);
    if (!file.delete()) {
      LOG.warn("Could not delete " + file);
    }
  }
}
 
源代码22 项目: hadoop   文件: SwiftNativeOutputStream.java

@Override
public synchronized void write(byte[] buffer, int offset, int len) throws
                                                                   IOException {
  //validate args
  if (offset < 0 || len < 0 || (offset + len) > buffer.length) {
    throw new IndexOutOfBoundsException("Invalid offset/length for write");
  }
  //validate the output stream
  verifyOpen();
  SwiftUtils.debug(LOG, " write(offset=%d, len=%d)", offset, len);

  // if the size of file is greater than the partition limit
  while (blockOffset + len >= filePartSize) {
    // - then partition the blob and upload as many partitions
    // are needed.
    //how many bytes to write for this partition.
    int subWriteLen = (int) (filePartSize - blockOffset);
    if (subWriteLen < 0 || subWriteLen > len) {
      throw new SwiftInternalStateException("Invalid subwrite len: "
                                            + subWriteLen
                                            + " -buffer len: " + len);
    }
    writeToBackupStream(buffer, offset, subWriteLen);
    //move the offset along and length down
    offset += subWriteLen;
    len -= subWriteLen;
    //now upload the partition that has just been filled up
    // (this also sets blockOffset=0)
    partUpload(false);
  }
  //any remaining data is now written
  writeToBackupStream(buffer, offset, len);
}
 
源代码23 项目: hadoop   文件: SwiftNativeInputStream.java

/**
 * Update the start of the buffer; always call from a sync'd clause
 * @param seekPos position sought.
 * @param contentLength content length provided by response (may be -1)
 */
private synchronized void updateStartOfBufferPosition(long seekPos,
                                                      long contentLength) {
  //reset the seek pointer
  pos = seekPos;
  //and put the buffer offset to 0
  rangeOffset = 0;
  this.contentLength = contentLength;
  SwiftUtils.trace(LOG, "Move: pos=%d; bufferOffset=%d; contentLength=%d",
                   pos,
                   rangeOffset,
                   contentLength);
}
 
源代码24 项目: hadoop   文件: SwiftNativeInputStream.java

/**
 * Fill the buffer from the target position
 * If the target position == current position, the
 * read still goes ahead; this is a way of handling partial read failures
 * @param targetPos target position
 * @throws IOException IO problems on the read
 */
private void fillBuffer(long targetPos) throws IOException {
  long length = targetPos + bufferSize;
  SwiftUtils.debug(LOG, "Fetching %d bytes starting at %d", length, targetPos);
  HttpBodyContent blob = nativeStore.getObject(path, targetPos, length);
  httpStream = blob.getInputStream();
  updateStartOfBufferPosition(targetPos, blob.getContentLength());
}
 
源代码25 项目: hadoop   文件: SwiftNativeFileSystemStore.java

/**
 * deletes object from Swift
 *
 * @param path path to delete
 * @return true if the path was deleted by this specific operation.
 * @throws IOException on a failure
 */
public boolean deleteObject(Path path) throws IOException {
  SwiftObjectPath swiftObjectPath = toObjectPath(path);
  if (!SwiftUtils.isRootDir(swiftObjectPath)) {
    return swiftRestClient.delete(swiftObjectPath);
  } else {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Not deleting root directory entry");
    }
    return true;
  }
}
 

@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testRenamePartitionedFile() throws Throwable {
  Path src = new Path("/test/testRenamePartitionedFileSrc");

  int len = data.length;
  SwiftTestUtils.writeDataset(fs, src, data, len, 1024, false);
  assertExists("Exists", src);

  String partOneName = SwiftUtils.partitionFilenameFromNumber(1);
  Path srcPart = new Path(src, partOneName);
  Path dest = new Path("/test/testRenamePartitionedFileDest");
  Path destPart = new Path(src, partOneName);
  assertExists("Partition Exists", srcPart);
  fs.rename(src, dest);
  assertPathExists(fs, "dest file missing", dest);
  FileStatus status = fs.getFileStatus(dest);
  assertEquals("Length of renamed file is wrong", len, status.getLen());
  byte[] destData = readDataset(fs, dest, len);
  //compare data
  SwiftTestUtils.compareByteArrays(data, destData, len);
  String srcLs = SwiftTestUtils.ls(fs, src);
  String destLs = SwiftTestUtils.ls(fs, dest);

  assertPathDoesNotExist("deleted file still found in " + srcLs, src);

  assertPathDoesNotExist("partition file still found in " + srcLs, srcPart);
}
 
源代码27 项目: big-c   文件: SwiftRestClient.java

/**
 * Converts Swift path to URI to make request.
 * This is public for unit testing
 *
 * @param path path to object
 * @param endpointURI damain url e.g. http://domain.com
 * @return valid URI for object
 * @throws SwiftException
 */
public static URI pathToURI(SwiftObjectPath path,
                            URI endpointURI) throws SwiftException {
  checkNotNull(endpointURI, "Null Endpoint -client is not authenticated");

  String dataLocationURI = endpointURI.toString();
  try {

    dataLocationURI = SwiftUtils.joinPaths(dataLocationURI, encodeUrl(path.toUriPath()));
    return new URI(dataLocationURI);
  } catch (URISyntaxException e) {
    throw new SwiftException("Failed to create URI from " + dataLocationURI, e);
  }
}
 
源代码28 项目: big-c   文件: SwiftNativeFileSystem.java

@Override
public boolean isFile(Path f) throws IOException {
  try {
    FileStatus fileStatus = getFileStatus(f);
    return !SwiftUtils.isDirectory(fileStatus);
  } catch (FileNotFoundException e) {
    return false;               // f does not exist
  }
}
 
源代码29 项目: big-c   文件: SwiftNativeFileSystem.java

@Override
public boolean isDirectory(Path f) throws IOException {

  try {
    FileStatus fileStatus = getFileStatus(f);
    return SwiftUtils.isDirectory(fileStatus);
  } catch (FileNotFoundException e) {
    return false;               // f does not exist
  }
}
 
源代码30 项目: big-c   文件: SwiftNativeOutputStream.java

@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFileAttempt(Path keypath, int attempt) throws IOException {
  long uploadLen = backupFile.length();
  SwiftUtils.debug(LOG, "Closing write of file %s;" +
                        " localfile=%s of length %d - attempt %d",
                   key,
                   backupFile,
                   uploadLen,
                   attempt);

  nativeStore.uploadFile(keypath,
                         new FileInputStream(backupFile),
                         uploadLen);
  return uploadLen;
}
 
 类所在包
 同包方法