下面列出了怎么用org.apache.hadoop.fs.swift.util.SwiftUtils的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
" localfile=%s of length %d - attempt %d",
partNumber,
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFilePart(new Path(key),
partNumber,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
SwiftUtils.validateReadArgs(b, off, len);
int result = -1;
try {
verifyOpen();
result = httpStream.read(b, off, len);
} catch (IOException e) {
//other IO problems are viewed as transient and re-attempted
LOG.info("Received IOException while reading '" + path +
"', attempting to reopen: " + e);
LOG.debug("IOE on read()" + e, e);
if (reopenBuffer()) {
result = httpStream.read(b, off, len);
}
}
if (result > 0) {
incPos(result);
if (statistics != null) {
statistics.incrementBytesRead(result);
}
}
return result;
}
/**
* Upload part of a larger file.
*
* @param path destination path
* @param partNumber item number in the path
* @param inputStream input data
* @param length length of the data
* @throws IOException on a problem
*/
public void uploadFilePart(Path path, int partNumber,
InputStream inputStream, long length)
throws IOException {
String stringPath = path.toUri().toString();
String partitionFilename = SwiftUtils.partitionFilenameFromNumber(
partNumber);
if (stringPath.endsWith("/")) {
stringPath = stringPath.concat(partitionFilename);
} else {
stringPath = stringPath.concat("/").concat(partitionFilename);
}
swiftRestClient.upload(
new SwiftObjectPath(toDirPath(path).getContainer(), stringPath),
inputStream,
length);
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeleteSmallPartitionedFile() throws Throwable {
final Path path = new Path("/test/testDeleteSmallPartitionedFile");
final int len1 = 1024;
final byte[] src1 = SwiftTestUtils.dataset(len1, 'A', 'Z');
SwiftTestUtils.writeDataset(fs, path, src1, len1, 1024, false);
assertExists("Exists", path);
Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
String ls = SwiftTestUtils.ls(fs, path);
assertExists("Partition 0001 Exists in " + ls, part_0001);
assertPathDoesNotExist("partition 0002 found under " + ls, part_0002);
assertExists("Partition 0002 Exists in " + ls, part_0001);
fs.delete(path, false);
assertPathDoesNotExist("deleted file still there", path);
ls = SwiftTestUtils.ls(fs, path);
assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeletePartitionedFile() throws Throwable {
final Path path = new Path("/test/testDeletePartitionedFile");
SwiftTestUtils.writeDataset(fs, path, data, data.length, 1024, false);
assertExists("Exists", path);
Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
String ls = SwiftTestUtils.ls(fs, path);
assertExists("Partition 0001 Exists in " + ls, part_0001);
assertExists("Partition 0002 Exists in " + ls, part_0001);
fs.delete(path, false);
assertPathDoesNotExist("deleted file still there", path);
ls = SwiftTestUtils.ls(fs, path);
assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
assertPathDoesNotExist("partition 0002 file still under " + ls, part_0002);
}
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
" localfile=%s of length %d - attempt %d",
partNumber,
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFilePart(new Path(key),
partNumber,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
SwiftUtils.validateReadArgs(b, off, len);
int result = -1;
try {
verifyOpen();
result = httpStream.read(b, off, len);
} catch (IOException e) {
//other IO problems are viewed as transient and re-attempted
LOG.info("Received IOException while reading '" + path +
"', attempting to reopen: " + e);
LOG.debug("IOE on read()" + e, e);
if (reopenBuffer()) {
result = httpStream.read(b, off, len);
}
}
if (result > 0) {
incPos(result);
if (statistics != null) {
statistics.incrementBytesRead(result);
}
}
return result;
}
/**
* Upload part of a larger file.
*
* @param path destination path
* @param partNumber item number in the path
* @param inputStream input data
* @param length length of the data
* @throws IOException on a problem
*/
public void uploadFilePart(Path path, int partNumber,
InputStream inputStream, long length)
throws IOException {
String stringPath = path.toUri().toString();
String partitionFilename = SwiftUtils.partitionFilenameFromNumber(
partNumber);
if (stringPath.endsWith("/")) {
stringPath = stringPath.concat(partitionFilename);
} else {
stringPath = stringPath.concat("/").concat(partitionFilename);
}
swiftRestClient.upload(
new SwiftObjectPath(toDirPath(path).getContainer(), stringPath),
inputStream,
length);
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeleteSmallPartitionedFile() throws Throwable {
final Path path = new Path("/test/testDeleteSmallPartitionedFile");
final int len1 = 1024;
final byte[] src1 = SwiftTestUtils.dataset(len1, 'A', 'Z');
SwiftTestUtils.writeDataset(fs, path, src1, len1, 1024, false);
assertExists("Exists", path);
Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
String ls = SwiftTestUtils.ls(fs, path);
assertExists("Partition 0001 Exists in " + ls, part_0001);
assertPathDoesNotExist("partition 0002 found under " + ls, part_0002);
assertExists("Partition 0002 Exists in " + ls, part_0001);
fs.delete(path, false);
assertPathDoesNotExist("deleted file still there", path);
ls = SwiftTestUtils.ls(fs, path);
assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeletePartitionedFile() throws Throwable {
final Path path = new Path("/test/testDeletePartitionedFile");
SwiftTestUtils.writeDataset(fs, path, data, data.length, 1024, false);
assertExists("Exists", path);
Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
String ls = SwiftTestUtils.ls(fs, path);
assertExists("Partition 0001 Exists in " + ls, part_0001);
assertExists("Partition 0002 Exists in " + ls, part_0001);
fs.delete(path, false);
assertPathDoesNotExist("deleted file still there", path);
ls = SwiftTestUtils.ls(fs, path);
assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
assertPathDoesNotExist("partition 0002 file still under " + ls, part_0002);
}
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
" localfile=%s of length %d - attempt %d",
partNumber,
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFilePart(new Path(key),
partNumber,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
SwiftUtils.validateReadArgs(b, off, len);
int result = -1;
try {
verifyOpen();
result = httpStream.read(b, off, len);
} catch (IOException e) {
//other IO problems are viewed as transient and re-attempted
LOG.info("Received IOException while reading '" + path +
"', attempting to reopen: " + e);
LOG.debug("IOE on read()" + e, e);
if (reopenBuffer()) {
result = httpStream.read(b, off, len);
}
}
if (result > 0) {
incPos(result);
if (statistics != null) {
statistics.incrementBytesRead(result);
}
}
return result;
}
/**
* Upload part of a larger file.
*
* @param path destination path
* @param partNumber item number in the path
* @param inputStream input data
* @param length length of the data
* @throws IOException on a problem
*/
public void uploadFilePart(Path path, int partNumber,
InputStream inputStream, long length)
throws IOException {
String stringPath = path.toUri().getPath();
String partitionFilename = SwiftUtils.partitionFilenameFromNumber(
partNumber);
if (stringPath.endsWith("/")) {
stringPath = stringPath.concat(partitionFilename);
} else {
stringPath = stringPath.concat("/").concat(partitionFilename);
}
swiftRestClient.upload(
new SwiftObjectPath(toDirPath(path).getContainer(), stringPath),
inputStream,
length);
}
/**
* deletes object from Swift
*
* @param status FileStatus to delete
* @return true if the path was deleted by this specific operation.
* @throws IOException on a failure
*/
public boolean deleteObject(FileStatus status) throws IOException {
SwiftObjectPath swiftObjectPath;
if (status.isDir()) {
swiftObjectPath = toDirPath(status.getPath());
} else {
swiftObjectPath = toObjectPath(status.getPath());
}
if (!SwiftUtils.isRootDir(swiftObjectPath)) {
return swiftRestClient.delete(swiftObjectPath);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not deleting root directory entry");
}
return true;
}
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeleteSmallPartitionedFile() throws Throwable {
final Path path = new Path("/test/testDeleteSmallPartitionedFile");
final int len1 = 1024;
final byte[] src1 = SwiftTestUtils.dataset(len1, 'A', 'Z');
SwiftTestUtils.writeDataset(fs, path, src1, len1, 1024, false);
assertExists("Exists", path);
Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
String ls = SwiftTestUtils.ls(fs, path);
assertExists("Partition 0001 Exists in " + ls, part_0001);
assertPathDoesNotExist("partition 0002 found under " + ls, part_0002);
assertExists("Partition 0002 Exists in " + ls, part_0001);
fs.delete(path, false);
assertPathDoesNotExist("deleted file still there", path);
ls = SwiftTestUtils.ls(fs, path);
assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeletePartitionedFile() throws Throwable {
final Path path = new Path("/test/testDeletePartitionedFile");
SwiftTestUtils.writeDataset(fs, path, data, data.length, 1024, false);
assertExists("Exists", path);
Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
String ls = SwiftTestUtils.ls(fs, path);
assertExists("Partition 0001 Exists in " + ls, part_0001);
assertExists("Partition 0002 Exists in " + ls, part_0001);
fs.delete(path, false);
assertPathDoesNotExist("deleted file still there", path);
ls = SwiftTestUtils.ls(fs, path);
assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
assertPathDoesNotExist("partition 0002 file still under " + ls, part_0002);
}
/**
* Converts Swift path to URI to make request.
* This is public for unit testing
*
* @param path path to object
* @param endpointURI damain url e.g. http://domain.com
* @return valid URI for object
* @throws SwiftException
*/
public static URI pathToURI(SwiftObjectPath path,
URI endpointURI) throws SwiftException {
checkNotNull(endpointURI, "Null Endpoint -client is not authenticated");
String dataLocationURI = endpointURI.toString();
try {
dataLocationURI = SwiftUtils.joinPaths(dataLocationURI, encodeUrl(path.toUriPath()));
return new URI(dataLocationURI);
} catch (URISyntaxException e) {
throw new SwiftException("Failed to create URI from " + dataLocationURI, e);
}
}
@Override
public boolean isFile(Path f) throws IOException {
try {
FileStatus fileStatus = getFileStatus(f);
return !SwiftUtils.isDirectory(fileStatus);
} catch (FileNotFoundException e) {
return false; // f does not exist
}
}
@Override
public boolean isDirectory(Path f) throws IOException {
try {
FileStatus fileStatus = getFileStatus(f);
return SwiftUtils.isDirectory(fileStatus);
} catch (FileNotFoundException e) {
return false; // f does not exist
}
}
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFileAttempt(Path keypath, int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Closing write of file %s;" +
" localfile=%s of length %d - attempt %d",
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFile(keypath,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
private void delete(File file) {
if (file != null) {
SwiftUtils.debug(LOG, "deleting %s", file);
if (!file.delete()) {
LOG.warn("Could not delete " + file);
}
}
}
@Override
public synchronized void write(byte[] buffer, int offset, int len) throws
IOException {
//validate args
if (offset < 0 || len < 0 || (offset + len) > buffer.length) {
throw new IndexOutOfBoundsException("Invalid offset/length for write");
}
//validate the output stream
verifyOpen();
SwiftUtils.debug(LOG, " write(offset=%d, len=%d)", offset, len);
// if the size of file is greater than the partition limit
while (blockOffset + len >= filePartSize) {
// - then partition the blob and upload as many partitions
// are needed.
//how many bytes to write for this partition.
int subWriteLen = (int) (filePartSize - blockOffset);
if (subWriteLen < 0 || subWriteLen > len) {
throw new SwiftInternalStateException("Invalid subwrite len: "
+ subWriteLen
+ " -buffer len: " + len);
}
writeToBackupStream(buffer, offset, subWriteLen);
//move the offset along and length down
offset += subWriteLen;
len -= subWriteLen;
//now upload the partition that has just been filled up
// (this also sets blockOffset=0)
partUpload(false);
}
//any remaining data is now written
writeToBackupStream(buffer, offset, len);
}
/**
* Update the start of the buffer; always call from a sync'd clause
* @param seekPos position sought.
* @param contentLength content length provided by response (may be -1)
*/
private synchronized void updateStartOfBufferPosition(long seekPos,
long contentLength) {
//reset the seek pointer
pos = seekPos;
//and put the buffer offset to 0
rangeOffset = 0;
this.contentLength = contentLength;
SwiftUtils.trace(LOG, "Move: pos=%d; bufferOffset=%d; contentLength=%d",
pos,
rangeOffset,
contentLength);
}
/**
* Fill the buffer from the target position
* If the target position == current position, the
* read still goes ahead; this is a way of handling partial read failures
* @param targetPos target position
* @throws IOException IO problems on the read
*/
private void fillBuffer(long targetPos) throws IOException {
long length = targetPos + bufferSize;
SwiftUtils.debug(LOG, "Fetching %d bytes starting at %d", length, targetPos);
HttpBodyContent blob = nativeStore.getObject(path, targetPos, length);
httpStream = blob.getInputStream();
updateStartOfBufferPosition(targetPos, blob.getContentLength());
}
/**
* deletes object from Swift
*
* @param path path to delete
* @return true if the path was deleted by this specific operation.
* @throws IOException on a failure
*/
public boolean deleteObject(Path path) throws IOException {
SwiftObjectPath swiftObjectPath = toObjectPath(path);
if (!SwiftUtils.isRootDir(swiftObjectPath)) {
return swiftRestClient.delete(swiftObjectPath);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not deleting root directory entry");
}
return true;
}
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testRenamePartitionedFile() throws Throwable {
Path src = new Path("/test/testRenamePartitionedFileSrc");
int len = data.length;
SwiftTestUtils.writeDataset(fs, src, data, len, 1024, false);
assertExists("Exists", src);
String partOneName = SwiftUtils.partitionFilenameFromNumber(1);
Path srcPart = new Path(src, partOneName);
Path dest = new Path("/test/testRenamePartitionedFileDest");
Path destPart = new Path(src, partOneName);
assertExists("Partition Exists", srcPart);
fs.rename(src, dest);
assertPathExists(fs, "dest file missing", dest);
FileStatus status = fs.getFileStatus(dest);
assertEquals("Length of renamed file is wrong", len, status.getLen());
byte[] destData = readDataset(fs, dest, len);
//compare data
SwiftTestUtils.compareByteArrays(data, destData, len);
String srcLs = SwiftTestUtils.ls(fs, src);
String destLs = SwiftTestUtils.ls(fs, dest);
assertPathDoesNotExist("deleted file still found in " + srcLs, src);
assertPathDoesNotExist("partition file still found in " + srcLs, srcPart);
}
/**
* Converts Swift path to URI to make request.
* This is public for unit testing
*
* @param path path to object
* @param endpointURI damain url e.g. http://domain.com
* @return valid URI for object
* @throws SwiftException
*/
public static URI pathToURI(SwiftObjectPath path,
URI endpointURI) throws SwiftException {
checkNotNull(endpointURI, "Null Endpoint -client is not authenticated");
String dataLocationURI = endpointURI.toString();
try {
dataLocationURI = SwiftUtils.joinPaths(dataLocationURI, encodeUrl(path.toUriPath()));
return new URI(dataLocationURI);
} catch (URISyntaxException e) {
throw new SwiftException("Failed to create URI from " + dataLocationURI, e);
}
}
@Override
public boolean isFile(Path f) throws IOException {
try {
FileStatus fileStatus = getFileStatus(f);
return !SwiftUtils.isDirectory(fileStatus);
} catch (FileNotFoundException e) {
return false; // f does not exist
}
}
@Override
public boolean isDirectory(Path f) throws IOException {
try {
FileStatus fileStatus = getFileStatus(f);
return SwiftUtils.isDirectory(fileStatus);
} catch (FileNotFoundException e) {
return false; // f does not exist
}
}
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFileAttempt(Path keypath, int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Closing write of file %s;" +
" localfile=%s of length %d - attempt %d",
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFile(keypath,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}