下面列出了org.apache.hadoop.fs.swift.util.SwiftUtils#debug ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
" localfile=%s of length %d - attempt %d",
partNumber,
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFilePart(new Path(key),
partNumber,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
SwiftUtils.validateReadArgs(b, off, len);
int result = -1;
try {
verifyOpen();
result = httpStream.read(b, off, len);
} catch (IOException e) {
//other IO problems are viewed as transient and re-attempted
LOG.info("Received IOException while reading '" + path +
"', attempting to reopen: " + e);
LOG.debug("IOE on read()" + e, e);
if (reopenBuffer()) {
result = httpStream.read(b, off, len);
}
}
if (result > 0) {
incPos(result);
if (statistics != null) {
statistics.incrementBytesRead(result);
}
}
return result;
}
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
" localfile=%s of length %d - attempt %d",
partNumber,
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFilePart(new Path(key),
partNumber,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFilePartAttempt(int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Uploading part %d of file %s;" +
" localfile=%s of length %d - attempt %d",
partNumber,
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFilePart(new Path(key),
partNumber,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
SwiftUtils.debug(LOG, "read(buffer, %d, %d)", off, len);
SwiftUtils.validateReadArgs(b, off, len);
int result = -1;
try {
verifyOpen();
result = httpStream.read(b, off, len);
} catch (IOException e) {
//other IO problems are viewed as transient and re-attempted
LOG.info("Received IOException while reading '" + path +
"', attempting to reopen: " + e);
LOG.debug("IOE on read()" + e, e);
if (reopenBuffer()) {
result = httpStream.read(b, off, len);
}
}
if (result > 0) {
incPos(result);
if (statistics != null) {
statistics.incrementBytesRead(result);
}
}
return result;
}
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFileAttempt(Path keypath, int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Closing write of file %s;" +
" localfile=%s of length %d - attempt %d",
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFile(keypath,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
private void delete(File file) {
if (file != null) {
SwiftUtils.debug(LOG, "deleting %s", file);
if (!file.delete()) {
LOG.warn("Could not delete " + file);
}
}
}
@Override
public synchronized void write(byte[] buffer, int offset, int len) throws
IOException {
//validate args
if (offset < 0 || len < 0 || (offset + len) > buffer.length) {
throw new IndexOutOfBoundsException("Invalid offset/length for write");
}
//validate the output stream
verifyOpen();
SwiftUtils.debug(LOG, " write(offset=%d, len=%d)", offset, len);
// if the size of file is greater than the partition limit
while (blockOffset + len >= filePartSize) {
// - then partition the blob and upload as many partitions
// are needed.
//how many bytes to write for this partition.
int subWriteLen = (int) (filePartSize - blockOffset);
if (subWriteLen < 0 || subWriteLen > len) {
throw new SwiftInternalStateException("Invalid subwrite len: "
+ subWriteLen
+ " -buffer len: " + len);
}
writeToBackupStream(buffer, offset, subWriteLen);
//move the offset along and length down
offset += subWriteLen;
len -= subWriteLen;
//now upload the partition that has just been filled up
// (this also sets blockOffset=0)
partUpload(false);
}
//any remaining data is now written
writeToBackupStream(buffer, offset, len);
}
/**
* Fill the buffer from the target position
* If the target position == current position, the
* read still goes ahead; this is a way of handling partial read failures
* @param targetPos target position
* @throws IOException IO problems on the read
*/
private void fillBuffer(long targetPos) throws IOException {
long length = targetPos + bufferSize;
SwiftUtils.debug(LOG, "Fetching %d bytes starting at %d", length, targetPos);
HttpBodyContent blob = nativeStore.getObject(path, targetPos, length);
httpStream = blob.getInputStream();
updateStartOfBufferPosition(targetPos, blob.getContentLength());
}
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private long uploadFileAttempt(Path keypath, int attempt) throws IOException {
long uploadLen = backupFile.length();
SwiftUtils.debug(LOG, "Closing write of file %s;" +
" localfile=%s of length %d - attempt %d",
key,
backupFile,
uploadLen,
attempt);
nativeStore.uploadFile(keypath,
new FileInputStream(backupFile),
uploadLen);
return uploadLen;
}
private void delete(File file) {
if (file != null) {
SwiftUtils.debug(LOG, "deleting %s", file);
if (!file.delete()) {
LOG.warn("Could not delete " + file);
}
}
}
/**
* Upload a single partition. This deletes the local backing-file,
* and re-opens it to create a new one.
* @param closingUpload is this the final upload of an upload
* @throws IOException on IO problems
*/
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private void partUpload(boolean closingUpload) throws IOException {
if (backupStream != null) {
backupStream.close();
}
if (closingUpload && partUpload && backupFile.length() == 0) {
//skipping the upload if
// - it is close time
// - the final partition is 0 bytes long
// - one part has already been written
SwiftUtils.debug(LOG, "skipping upload of 0 byte final partition");
delete(backupFile);
} else {
partUpload = true;
boolean uploadSuccess = false;
int attempt = 0;
while(!uploadSuccess) {
try {
++attempt;
bytesUploaded += uploadFilePartAttempt(attempt);
uploadSuccess = true;
} catch (IOException e) {
LOG.info("Upload failed " + e, e);
if (attempt > ATTEMPT_LIMIT) {
throw e;
}
}
}
delete(backupFile);
partNumber++;
blockOffset = 0;
if (!closingUpload) {
//if not the final upload, create a new output stream
backupFile = newBackupFile();
backupStream =
new BufferedOutputStream(new FileOutputStream(backupFile));
}
}
}
/**
* Seek to an offset. If the data is already in the buffer, move to it
* @param targetPos target position
* @throws IOException on any problem
*/
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos < 0) {
throw new EOFException(
FSExceptionMessages.NEGATIVE_SEEK);
}
//there's some special handling of near-local data
//as the seek can be omitted if it is in/adjacent
long offset = targetPos - pos;
if (LOG.isDebugEnabled()) {
LOG.debug("Seek to " + targetPos + "; current pos =" + pos
+ "; offset="+offset);
}
if (offset == 0) {
LOG.debug("seek is no-op");
return;
}
if (offset < 0) {
LOG.debug("seek is backwards");
} else if ((rangeOffset + offset < bufferSize)) {
//if the seek is in range of that requested, scan forwards
//instead of closing and re-opening a new HTTP connection
SwiftUtils.debug(LOG,
"seek is within current stream"
+ "; pos= %d ; targetPos=%d; "
+ "offset= %d ; bufferOffset=%d",
pos, targetPos, offset, rangeOffset);
try {
LOG.debug("chomping ");
chompBytes(offset);
} catch (IOException e) {
//this is assumed to be recoverable with a seek -or more likely to fail
LOG.debug("while chomping ",e);
}
if (targetPos - pos == 0) {
LOG.trace("chomping successful");
return;
}
LOG.trace("chomping failed");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Seek is beyond buffer size of " + bufferSize);
}
}
innerClose("seeking to " + targetPos);
fillBuffer(targetPos);
}
/**
* tests functionality for big files ( > 5Gb) upload
*/
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testFilePartUpload() throws Throwable {
final Path path = new Path("/test/testFilePartUpload");
int len = 8192;
final byte[] src = SwiftTestUtils.dataset(len, 32, 144);
FSDataOutputStream out = fs.create(path,
false,
getBufferSize(),
(short) 1,
BLOCK_SIZE);
try {
int totalPartitionsToWrite = len / PART_SIZE_BYTES;
assertPartitionsWritten("Startup", out, 0);
//write 2048
int firstWriteLen = 2048;
out.write(src, 0, firstWriteLen);
//assert
long expected = getExpectedPartitionsWritten(firstWriteLen,
PART_SIZE_BYTES,
false);
SwiftUtils.debug(LOG, "First write: predict %d partitions written",
expected);
assertPartitionsWritten("First write completed", out, expected);
//write the rest
int remainder = len - firstWriteLen;
SwiftUtils.debug(LOG, "remainder: writing: %d bytes", remainder);
out.write(src, firstWriteLen, remainder);
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, false);
assertPartitionsWritten("Remaining data", out, expected);
out.close();
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, true);
assertPartitionsWritten("Stream closed", out, expected);
Header[] headers = fs.getStore().getObjectHeaders(path, true);
for (Header header : headers) {
LOG.info(header.toString());
}
byte[] dest = readDataset(fs, path, len);
LOG.info("Read dataset from " + path + ": data length =" + len);
//compare data
SwiftTestUtils.compareByteArrays(src, dest, len);
FileStatus status;
final Path qualifiedPath = path.makeQualified(fs);
status = fs.getFileStatus(qualifiedPath);
//now see what block location info comes back.
//This will vary depending on the Swift version, so the results
//aren't checked -merely that the test actually worked
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, len);
assertNotNull("Null getFileBlockLocations()", locations);
assertTrue("empty array returned for getFileBlockLocations()",
locations.length > 0);
//last bit of test -which seems to play up on partitions, which we download
//to a skip
try {
validatePathLen(path, len);
} catch (AssertionError e) {
//downgrade to a skip
throw new AssumptionViolatedException(e, null);
}
} finally {
IOUtils.closeStream(out);
}
}
/**
* tests functionality for big files ( > 5Gb) upload
*/
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testFilePartUploadNoLengthCheck() throws IOException, URISyntaxException {
final Path path = new Path("/test/testFilePartUploadLengthCheck");
int len = 8192;
final byte[] src = SwiftTestUtils.dataset(len, 32, 144);
FSDataOutputStream out = fs.create(path,
false,
getBufferSize(),
(short) 1,
BLOCK_SIZE);
try {
int totalPartitionsToWrite = len / PART_SIZE_BYTES;
assertPartitionsWritten("Startup", out, 0);
//write 2048
int firstWriteLen = 2048;
out.write(src, 0, firstWriteLen);
//assert
long expected = getExpectedPartitionsWritten(firstWriteLen,
PART_SIZE_BYTES,
false);
SwiftUtils.debug(LOG, "First write: predict %d partitions written",
expected);
assertPartitionsWritten("First write completed", out, expected);
//write the rest
int remainder = len - firstWriteLen;
SwiftUtils.debug(LOG, "remainder: writing: %d bytes", remainder);
out.write(src, firstWriteLen, remainder);
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, false);
assertPartitionsWritten("Remaining data", out, expected);
out.close();
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, true);
assertPartitionsWritten("Stream closed", out, expected);
Header[] headers = fs.getStore().getObjectHeaders(path, true);
for (Header header : headers) {
LOG.info(header.toString());
}
byte[] dest = readDataset(fs, path, len);
LOG.info("Read dataset from " + path + ": data length =" + len);
//compare data
SwiftTestUtils.compareByteArrays(src, dest, len);
FileStatus status = fs.getFileStatus(path);
//now see what block location info comes back.
//This will vary depending on the Swift version, so the results
//aren't checked -merely that the test actually worked
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, len);
assertNotNull("Null getFileBlockLocations()", locations);
assertTrue("empty array returned for getFileBlockLocations()",
locations.length > 0);
} finally {
IOUtils.closeStream(out);
}
}
/**
* Upload a single partition. This deletes the local backing-file,
* and re-opens it to create a new one.
* @param closingUpload is this the final upload of an upload
* @throws IOException on IO problems
*/
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
private void partUpload(boolean closingUpload) throws IOException {
if (backupStream != null) {
backupStream.close();
}
if (closingUpload && partUpload && backupFile.length() == 0) {
//skipping the upload if
// - it is close time
// - the final partition is 0 bytes long
// - one part has already been written
SwiftUtils.debug(LOG, "skipping upload of 0 byte final partition");
delete(backupFile);
} else {
partUpload = true;
boolean uploadSuccess = false;
int attempt = 0;
while(!uploadSuccess) {
try {
++attempt;
bytesUploaded += uploadFilePartAttempt(attempt);
uploadSuccess = true;
} catch (IOException e) {
LOG.info("Upload failed " + e, e);
if (attempt > ATTEMPT_LIMIT) {
throw e;
}
}
}
delete(backupFile);
partNumber++;
blockOffset = 0;
if (!closingUpload) {
//if not the final upload, create a new output stream
backupFile = newBackupFile();
backupStream =
new BufferedOutputStream(new FileOutputStream(backupFile));
}
}
}
/**
* Seek to an offset. If the data is already in the buffer, move to it
* @param targetPos target position
* @throws IOException on any problem
*/
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos < 0) {
throw new EOFException(
FSExceptionMessages.NEGATIVE_SEEK);
}
//there's some special handling of near-local data
//as the seek can be omitted if it is in/adjacent
long offset = targetPos - pos;
if (LOG.isDebugEnabled()) {
LOG.debug("Seek to " + targetPos + "; current pos =" + pos
+ "; offset="+offset);
}
if (offset == 0) {
LOG.debug("seek is no-op");
return;
}
if (offset < 0) {
LOG.debug("seek is backwards");
} else if ((rangeOffset + offset < bufferSize)) {
//if the seek is in range of that requested, scan forwards
//instead of closing and re-opening a new HTTP connection
SwiftUtils.debug(LOG,
"seek is within current stream"
+ "; pos= %d ; targetPos=%d; "
+ "offset= %d ; bufferOffset=%d",
pos, targetPos, offset, rangeOffset);
try {
LOG.debug("chomping ");
chompBytes(offset);
} catch (IOException e) {
//this is assumed to be recoverable with a seek -or more likely to fail
LOG.debug("while chomping ",e);
}
if (targetPos - pos == 0) {
LOG.trace("chomping successful");
return;
}
LOG.trace("chomping failed");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Seek is beyond buffer size of " + bufferSize);
}
}
innerClose("seeking to " + targetPos);
fillBuffer(targetPos);
}
public synchronized void realSeek(long targetPos) throws IOException {
if (targetPos < 0) {
throw new IOException("Negative Seek offset not supported");
}
//there's some special handling of near-local data
//as the seek can be omitted if it is in/adjacent
long offset = targetPos - pos;
if (LOG.isDebugEnabled()) {
LOG.debug("Seek to " + targetPos + "; current pos =" + pos
+ "; offset="+offset);
}
if (offset == 0) {
LOG.debug("seek is no-op");
return;
}
if (offset < 0) {
LOG.debug("seek is backwards");
} else if ((rangeOffset + offset < bufferSize)) {
//if the seek is in range of that requested, scan forwards
//instead of closing and re-opening a new HTTP connection
SwiftUtils.debug(LOG,
"seek is within current stream"
+ "; pos= %d ; targetPos=%d; "
+ "offset= %d ; bufferOffset=%d",
pos, targetPos, offset, rangeOffset);
try {
LOG.debug("chomping ");
chompBytes(offset);
} catch (IOException e) {
//this is assumed to be recoverable with a seek -or more likely to fail
LOG.debug("while chomping ",e);
}
if (targetPos - pos == 0) {
LOG.trace("chomping successful");
return;
}
LOG.trace("chomping failed");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Seek is beyond buffer size of " + bufferSize);
}
}
innerClose("seeking to " + targetPos);
fillBuffer(targetPos);
}
/**
* tests functionality for big files ( > 5Gb) upload
*/
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testFilePartUpload() throws Throwable {
final Path path = new Path("/test/testFilePartUpload");
int len = 8192;
final byte[] src = SwiftTestUtils.dataset(len, 32, 144);
FSDataOutputStream out = fs.create(path,
false,
getBufferSize(),
(short) 1,
BLOCK_SIZE);
try {
int totalPartitionsToWrite = len / PART_SIZE_BYTES;
assertPartitionsWritten("Startup", out, 0);
//write 2048
int firstWriteLen = 2048;
out.write(src, 0, firstWriteLen);
//assert
long expected = getExpectedPartitionsWritten(firstWriteLen,
PART_SIZE_BYTES,
false);
SwiftUtils.debug(LOG, "First write: predict %d partitions written",
expected);
assertPartitionsWritten("First write completed", out, expected);
//write the rest
int remainder = len - firstWriteLen;
SwiftUtils.debug(LOG, "remainder: writing: %d bytes", remainder);
out.write(src, firstWriteLen, remainder);
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, false);
assertPartitionsWritten("Remaining data", out, expected);
out.close();
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, true);
assertPartitionsWritten("Stream closed", out, expected);
Header[] headers = fs.getStore().getObjectHeaders(path, true);
for (Header header : headers) {
LOG.info(header.toString());
}
byte[] dest = readDataset(fs, path, len);
LOG.info("Read dataset from " + path + ": data length =" + len);
//compare data
SwiftTestUtils.compareByteArrays(src, dest, len);
FileStatus status;
final Path qualifiedPath = path.makeQualified(fs);
status = fs.getFileStatus(qualifiedPath);
//now see what block location info comes back.
//This will vary depending on the Swift version, so the results
//aren't checked -merely that the test actually worked
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, len);
assertNotNull("Null getFileBlockLocations()", locations);
assertTrue("empty array returned for getFileBlockLocations()",
locations.length > 0);
//last bit of test -which seems to play up on partitions, which we download
//to a skip
try {
validatePathLen(path, len);
} catch (AssertionError e) {
//downgrade to a skip
throw new AssumptionViolatedException(e, null);
}
} finally {
IOUtils.closeStream(out);
}
}
/**
* tests functionality for big files ( > 5Gb) upload
*/
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testFilePartUploadNoLengthCheck() throws IOException, URISyntaxException {
final Path path = new Path("/test/testFilePartUploadLengthCheck");
int len = 8192;
final byte[] src = SwiftTestUtils.dataset(len, 32, 144);
FSDataOutputStream out = fs.create(path,
false,
getBufferSize(),
(short) 1,
BLOCK_SIZE);
try {
int totalPartitionsToWrite = len / PART_SIZE_BYTES;
assertPartitionsWritten("Startup", out, 0);
//write 2048
int firstWriteLen = 2048;
out.write(src, 0, firstWriteLen);
//assert
long expected = getExpectedPartitionsWritten(firstWriteLen,
PART_SIZE_BYTES,
false);
SwiftUtils.debug(LOG, "First write: predict %d partitions written",
expected);
assertPartitionsWritten("First write completed", out, expected);
//write the rest
int remainder = len - firstWriteLen;
SwiftUtils.debug(LOG, "remainder: writing: %d bytes", remainder);
out.write(src, firstWriteLen, remainder);
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, false);
assertPartitionsWritten("Remaining data", out, expected);
out.close();
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, true);
assertPartitionsWritten("Stream closed", out, expected);
Header[] headers = fs.getStore().getObjectHeaders(path, true);
for (Header header : headers) {
LOG.info(header.toString());
}
byte[] dest = readDataset(fs, path, len);
LOG.info("Read dataset from " + path + ": data length =" + len);
//compare data
SwiftTestUtils.compareByteArrays(src, dest, len);
FileStatus status = fs.getFileStatus(path);
//now see what block location info comes back.
//This will vary depending on the Swift version, so the results
//aren't checked -merely that the test actually worked
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, len);
assertNotNull("Null getFileBlockLocations()", locations);
assertTrue("empty array returned for getFileBlockLocations()",
locations.length > 0);
} finally {
IOUtils.closeStream(out);
}
}