下面列出了com.amazonaws.services.s3.model.ObjectTagging#com.amazonaws.services.s3.model.PartETag 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public AbortableOutputStream build() {
checkNotNull(s3);
checkArgument(trimToNull(bucket) != null);
checkArgument(trimToNull(key) != null);
checkArgument(partSize >= MINIMUM_PART_SIZE);
checkArgument(maxAttempts > 0);
checkArgument(sleepSeconds >= 0);
checkArgument(poolSize > 0);
checkArgument(queueSize > 0);
S3MultipartUpload upload = new S3MultipartUpload(s3, bucket, key, enableServerSideEncryption);
RetryTemplate retry = new RetryTemplate(maxAttempts, sleepSeconds);
ExecutorService executor = new BlockingExecutor(poolSize, queueSize);
AsyncHandler<PartETag> asyncHandler = new AsyncHandler<>(executor);
return new S3MultipartOutputStream(upload, partSize, retry, asyncHandler);
}
/**
* Block awaiting all outstanding uploads to complete.
*
* @return list of results
* @throws IOException IO Problems
*/
private List<PartETag> waitForAllPartUploads() throws IOException {
LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload", ie);
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException ee) {
// there is no way of recovering so abort
// cancel all partUploads
LOG.debug("While waiting for upload completion", ee);
LOG.debug("Cancelling futures");
for (ListenableFuture<PartETag> future : partETagsFutures) {
future.cancel(true);
}
// abort multipartupload
abort();
throw extractException("Multi-part upload with id '" + uploadId + "' to " + key, key, ee);
}
}
@Override
public void close() throws IOException {
if (!closedOrAborted) {
performUpload();
try {
List<PartETag> partETags = asyncHandler.waitForCompletion();
if (partETags.size() > 0) {
upload.complete(uploadId, partETags);
}
} catch (Exception e) {
upload.abort(uploadId);
throw e;
} finally {
asyncHandler.close();
}
closedOrAborted = true;
}
}
@Test
public void upload() {
ArgumentCaptor<UploadPartRequest> request = ArgumentCaptor.forClass(UploadPartRequest.class);
UploadPartResult response = mock(UploadPartResult.class);
PartETag partETag = mock(PartETag.class);
when(response.getPartETag()).thenReturn(partETag);
when(s3.uploadPart(request.capture())).thenReturn(response);
InputStream inputStream = mock(InputStream.class);
S3Part part = new S3Part(1, 2, "md5", inputStream);
PartETag result = underTest.upload(UPLOAD_ID, part);
assertThat(result, is(partETag));
assertThat(request.getValue().getBucketName(), is(BUCKET));
assertThat(request.getValue().getKey(), is(KEY));
assertThat(request.getValue().getPartNumber(), is(1));
assertThat(request.getValue().getPartSize(), is(2L));
assertThat(request.getValue().getMd5Digest(), is("md5"));
assertThat(request.getValue().getInputStream(), is(inputStream));
}
@Test
public void complete() {
InitiateMultipartUploadResult response = mock(InitiateMultipartUploadResult.class);
when(response.getUploadId()).thenReturn(UPLOAD_ID);
when(s3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(response);
ArgumentCaptor<CompleteMultipartUploadRequest> request = ArgumentCaptor
.forClass(CompleteMultipartUploadRequest.class);
@SuppressWarnings("unchecked")
List<PartETag> partETags = mock(List.class);
when(s3.completeMultipartUpload(request.capture())).thenReturn(null);
underTest.start();
underTest.complete(UPLOAD_ID, partETags);
assertThat(request.getValue().getBucketName(), is(BUCKET));
assertThat(request.getValue().getKey(), is(KEY));
assertThat(request.getValue().getUploadId(), is(UPLOAD_ID));
assertThat(request.getValue().getPartETags(), is(partETags));
}
@Test(timeout = 1000L)
public void failFastUploadingOnCompleteUploads() throws IOException {
when(upload.start()).thenReturn(UPLOAD_ID);
when(upload.upload(anyString(), any(S3Part.class))).then(new Answer<PartETag>() {
@Override
public PartETag answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(5000L);
return partETag2;
}
}).thenThrow(IOException.class);
try {
OutputStream output = new S3MultipartOutputStream(upload, 1, retry, asyncHandler);
output.write(new byte[] { 0 });
output.write(new byte[] { 1 });
output.close();
fail();
} catch (Exception e) {
verify(upload).abort(UPLOAD_ID);
}
}
MultiPartUploadInfo(
final String objectName,
final String uploadId,
final List<PartETag> completeParts,
final long numBytes,
final Optional<File> incompletePart) {
checkArgument(numBytes >= 0L);
this.objectName = checkNotNull(objectName);
this.uploadId = checkNotNull(uploadId);
this.completeParts = checkNotNull(completeParts);
this.incompletePart = checkNotNull(incompletePart);
this.numberOfRegisteredParts = completeParts.size();
this.expectedSizeInBytes = numBytes;
}
S3Recoverable(
String objectName,
String uploadId,
List<PartETag> parts,
long numBytesInParts,
@Nullable String lastPartObject,
long lastPartObjectLength
) {
checkArgument(numBytesInParts >= 0L);
checkArgument(lastPartObject == null || lastPartObjectLength > 0L);
this.objectName = checkNotNull(objectName);
this.uploadId = checkNotNull(uploadId);
this.parts = checkNotNull(parts);
this.numBytesInParts = numBytesInParts;
this.lastPartObject = lastPartObject;
this.lastPartObjectLength = lastPartObjectLength;
}
private RecoverableMultiPartUploadImpl(
S3AccessHelper s3AccessHelper,
Executor uploadThreadPool,
String uploadId,
String objectName,
List<PartETag> partsSoFar,
long numBytes,
Optional<File> incompletePart
) {
checkArgument(numBytes >= 0L);
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.uploadThreadPool = checkNotNull(uploadThreadPool);
this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
this.namePrefixForTempObjects = createIncompletePartObjectNamePrefix(objectName);
this.uploadsInProgress = new ArrayDeque<>();
}
/**
* Adds a part to the uploads without any size limitations.
*
* <p>This method is non-blocking and does not wait for the part upload to complete.
*
* @param file The file with the part data.
*
* @throws IOException If this method throws an exception, the RecoverableS3MultiPartUpload
* should not be used any more, but recovered instead.
*/
@Override
public void uploadPart(RefCountedFSOutputStream file) throws IOException {
// this is to guarantee that nobody is
// writing to the file we are uploading.
checkState(file.isClosed());
final CompletableFuture<PartETag> future = new CompletableFuture<>();
uploadsInProgress.add(future);
final long partLength = file.getPos();
currentUploadInfo.registerNewPart(partLength);
file.retain(); // keep the file while the async upload still runs
uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future));
}
/**
* Creates a snapshot of this MultiPartUpload, from which the upload can be resumed.
*
* <p>Data buffered locally which is less than
* {@link org.apache.flink.fs.s3.common.FlinkS3FileSystem#S3_MULTIPART_MIN_PART_SIZE S3_MULTIPART_MIN_PART_SIZE},
* and cannot be uploaded as part of the MPU and set to S3 as independent objects.
*
* <p>This implementation currently blocks until all part uploads are complete and returns
* a completed future.
*/
@Override
public S3Recoverable snapshotAndGetRecoverable(@Nullable final RefCountedFSOutputStream incompletePartFile) throws IOException {
final String incompletePartObjectName = safelyUploadSmallPart(incompletePartFile);
// make sure all other uploads are complete
// this currently makes the method blocking,
// to be made non-blocking in the future
awaitPendingPartsUpload();
final String objectName = currentUploadInfo.getObjectName();
final String uploadId = currentUploadInfo.getUploadId();
final List<PartETag> completedParts = currentUploadInfo.getCopyOfEtagsOfCompleteParts();
final long sizeInBytes = currentUploadInfo.getExpectedSizeInBytes();
if (incompletePartObjectName == null) {
return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes);
} else {
return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes, incompletePartObjectName, incompletePartFile.getPos());
}
}
public static RecoverableMultiPartUploadImpl recoverUpload(
final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final String multipartUploadId,
final String objectName,
final List<PartETag> partsSoFar,
final long numBytesSoFar,
final Optional<File> incompletePart) {
return new RecoverableMultiPartUploadImpl(
s3AccessHelper,
uploadThreadPool,
multipartUploadId,
objectName,
new ArrayList<>(partsSoFar),
numBytesSoFar,
incompletePart);
}
UploadTask(
final S3AccessHelper s3AccessHelper,
final MultiPartUploadInfo currentUpload,
final RefCountedFSOutputStream file,
final CompletableFuture<PartETag> future) {
checkNotNull(currentUpload);
this.objectName = currentUpload.getObjectName();
this.uploadId = currentUpload.getUploadId();
this.partNumber = currentUpload.getNumberOfRegisteredParts();
// these are limits put by Amazon
checkArgument(partNumber >= 1 && partNumber <= 10_000);
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.file = checkNotNull(file);
this.future = checkNotNull(future);
}
private static S3Recoverable createS3Recoverable(byte[] incompletePart, byte[]... completeParts) {
final List<PartETag> eTags = new ArrayList<>();
int index = 1;
long bytesInPart = 0L;
for (byte[] part : completeParts) {
eTags.add(new PartETag(index, createETag(TEST_OBJECT_NAME, index)));
bytesInPart += part.length;
index++;
}
return new S3Recoverable(
TEST_OBJECT_NAME,
createMPUploadId(TEST_OBJECT_NAME),
eTags,
bytesInPart,
"IGNORED-DUE-TO-RANDOMNESS",
(long) incompletePart.length);
}
MultiPartUploadInfo(
final String objectName,
final String uploadId,
final List<PartETag> completeParts,
final long numBytes,
final Optional<File> incompletePart) {
checkArgument(numBytes >= 0L);
this.objectName = checkNotNull(objectName);
this.uploadId = checkNotNull(uploadId);
this.completeParts = checkNotNull(completeParts);
this.incompletePart = checkNotNull(incompletePart);
this.numberOfRegisteredParts = completeParts.size();
this.expectedSizeInBytes = numBytes;
}
S3Recoverable(
String objectName,
String uploadId,
List<PartETag> parts,
long numBytesInParts,
@Nullable String lastPartObject,
long lastPartObjectLength
) {
checkArgument(numBytesInParts >= 0L);
checkArgument(lastPartObject == null || lastPartObjectLength > 0L);
this.objectName = checkNotNull(objectName);
this.uploadId = checkNotNull(uploadId);
this.parts = checkNotNull(parts);
this.numBytesInParts = numBytesInParts;
this.lastPartObject = lastPartObject;
this.lastPartObjectLength = lastPartObjectLength;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(128);
buf.append("S3Recoverable: ");
buf.append("key=").append(objectName);
buf.append(", uploadId=").append(uploadId);
buf.append(", bytesInParts=").append(numBytesInParts);
buf.append(", parts=[");
int num = 0;
for (PartETag part : parts) {
if (0 != num++) {
buf.append(", ");
}
buf.append(part.getPartNumber()).append('=').append(part.getETag());
}
buf.append("], trailingPart=").append(lastPartObject);
buf.append("trailingPartLen=").append(lastPartObjectLength);
return buf.toString();
}
private RecoverableMultiPartUploadImpl(
S3AccessHelper s3AccessHelper,
Executor uploadThreadPool,
String uploadId,
String objectName,
List<PartETag> partsSoFar,
long numBytes,
Optional<File> incompletePart
) {
checkArgument(numBytes >= 0L);
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.uploadThreadPool = checkNotNull(uploadThreadPool);
this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
this.namePrefixForTempObjects = createIncompletePartObjectNamePrefix(objectName);
this.uploadsInProgress = new ArrayDeque<>();
}
/**
* Adds a part to the uploads without any size limitations.
*
* <p>This method is non-blocking and does not wait for the part upload to complete.
*
* @param file The file with the part data.
*
* @throws IOException If this method throws an exception, the RecoverableS3MultiPartUpload
* should not be used any more, but recovered instead.
*/
@Override
public void uploadPart(RefCountedFSOutputStream file) throws IOException {
// this is to guarantee that nobody is
// writing to the file we are uploading.
checkState(file.isClosed());
final CompletableFuture<PartETag> future = new CompletableFuture<>();
uploadsInProgress.add(future);
final long partLength = file.getPos();
currentUploadInfo.registerNewPart(partLength);
file.retain(); // keep the file while the async upload still runs
uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future));
}
/**
* Creates a snapshot of this MultiPartUpload, from which the upload can be resumed.
*
* <p>Data buffered locally which is less than
* {@link org.apache.flink.fs.s3.common.FlinkS3FileSystem#S3_MULTIPART_MIN_PART_SIZE S3_MULTIPART_MIN_PART_SIZE},
* and cannot be uploaded as part of the MPU and set to S3 as independent objects.
*
* <p>This implementation currently blocks until all part uploads are complete and returns
* a completed future.
*/
@Override
public S3Recoverable snapshotAndGetRecoverable(@Nullable final RefCountedFSOutputStream incompletePartFile) throws IOException {
final String incompletePartObjectName = safelyUploadSmallPart(incompletePartFile);
// make sure all other uploads are complete
// this currently makes the method blocking,
// to be made non-blocking in the future
awaitPendingPartsUpload();
final String objectName = currentUploadInfo.getObjectName();
final String uploadId = currentUploadInfo.getUploadId();
final List<PartETag> completedParts = currentUploadInfo.getCopyOfEtagsOfCompleteParts();
final long sizeInBytes = currentUploadInfo.getExpectedSizeInBytes();
if (incompletePartObjectName == null) {
return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes);
} else {
return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes, incompletePartObjectName, incompletePartFile.getPos());
}
}
public static RecoverableMultiPartUploadImpl recoverUpload(
final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final String multipartUploadId,
final String objectName,
final List<PartETag> partsSoFar,
final long numBytesSoFar,
final Optional<File> incompletePart) {
return new RecoverableMultiPartUploadImpl(
s3AccessHelper,
uploadThreadPool,
multipartUploadId,
objectName,
new ArrayList<>(partsSoFar),
numBytesSoFar,
incompletePart);
}
UploadTask(
final S3AccessHelper s3AccessHelper,
final MultiPartUploadInfo currentUpload,
final RefCountedFSOutputStream file,
final CompletableFuture<PartETag> future) {
checkNotNull(currentUpload);
this.objectName = currentUpload.getObjectName();
this.uploadId = currentUpload.getUploadId();
this.partNumber = currentUpload.getNumberOfRegisteredParts();
// these are limits put by Amazon
checkArgument(partNumber >= 1 && partNumber <= 10_000);
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.file = checkNotNull(file);
this.future = checkNotNull(future);
}
private static S3Recoverable createS3Recoverable(byte[] incompletePart, byte[]... completeParts) {
final List<PartETag> eTags = new ArrayList<>();
int index = 1;
long bytesInPart = 0L;
for (byte[] part : completeParts) {
eTags.add(new PartETag(index, createETag(TEST_OBJECT_NAME, index)));
bytesInPart += part.length;
index++;
}
return new S3Recoverable(
TEST_OBJECT_NAME,
createMPUploadId(TEST_OBJECT_NAME),
eTags,
bytesInPart,
"IGNORED-DUE-TO-RANDOMNESS",
(long) incompletePart.length);
}
public MultipartState(String buf) {
String[] fields = buf.split(SEPARATOR);
_uploadId = fields[0];
_filePosition = Long.parseLong(fields[1]);
_partETags = new ArrayList<>();
for (String part : fields[2].split(",")) {
if (part != null && !part.isEmpty()) {
String[] partFields = part.split("/");
_partETags.add(new PartETag(Integer.parseInt(partFields[0]), partFields[1]));
}
}
_partSize = Long.parseLong(fields[3]);
_storageClass = StorageClass.fromValue(fields[4]);
_contentLength = Long.parseLong(fields[5]);
_timestamp = Long.parseLong(fields[6]);
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(_uploadId).append(SEPARATOR)
.append(_filePosition.toString()).append(SEPARATOR);
if (_partETags.size() > 0) {
boolean first = true;
for (PartETag tag : _partETags) {
if (!first) {
buf.append(",");
} else {
first = false;
}
buf.append(String.format("%d/%s", tag.getPartNumber(), tag.getETag()));
}
}
buf.append(SEPARATOR)
.append(_partSize.toString()).append(SEPARATOR)
.append(_storageClass.toString()).append(SEPARATOR)
.append(_contentLength.toString()).append(SEPARATOR)
.append(_timestamp.toString());
return buf.toString();
}
@Override
public void commitMultipart(String s3Key, String uploadId, List<PartETag> partETags)
throws ModelDBException {
// Validate bucket
Boolean exist = doesBucketExist(bucketName);
if (!exist) {
throw new ModelDBException("Bucket does not exists", io.grpc.Status.Code.INTERNAL);
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, s3Key, uploadId, partETags);
try {
CompleteMultipartUploadResult result =
s3Client.completeMultipartUpload(completeMultipartUploadRequest);
LOGGER.info("upload result: {}", result);
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_BAD_REQUEST) {
LOGGER.info("message: {} additional details: {}", e.getMessage(), e.getAdditionalDetails());
throw new ModelDBException(e.getErrorMessage(), io.grpc.Status.Code.FAILED_PRECONDITION);
}
throw e;
}
}
public void uploadPartAsync(ByteArrayInputStream inputStream,
int partSize) {
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request =
new UploadPartRequest().withBucketName(bucket).withKey(key)
.withUploadId(uploadId).withInputStream(inputStream)
.withPartNumber(currentPartNumber).withPartSize(partSize);
request.setGeneralProgressListener(progressListener);
ListenableFuture<PartETag> partETagFuture =
executorService.submit(new Callable<PartETag>() {
@Override
public PartETag call() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId);
}
return client.uploadPart(request).getPartETag();
}
});
partETagsFutures.add(partETagFuture);
}
public List<PartETag> waitForAllPartUploads() throws IOException {
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload:" + ie, ie);
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
//there is no way of recovering so abort
//cancel all partUploads
for (ListenableFuture<PartETag> future : partETagsFutures) {
future.cancel(true);
}
//abort multipartupload
this.abort();
throw new IOException("Part upload failed in multi-part upload with " +
"id '" +uploadId + "':" + ee, ee);
}
//should not happen?
return null;
}
/**
* This completes a multipart upload. Sometimes it fails; here retries are
* handled to avoid losing all data on a transient failure.
*
* @param partETags list of partial uploads
* @throws IOException on any problem
*/
private CompleteMultipartUploadResult complete(List<PartETag> partETags) throws IOException {
int retryCount = 0;
AmazonClientException lastException;
String operation = String.format("Completing multi-part upload for key '%s',"
+ " id '%s' with %s partitions ",
key, uploadId, partETags.size());
do {
try {
LOG.debug(operation);
return writeOperationHelper.completeMultipartUpload(uploadId, partETags);
} catch (AmazonClientException e) {
lastException = e;
}
}
while (shouldRetry(operation, lastException, retryCount++));
// this point is only reached if the operation failed more than
// the allowed retry count
throw translateException(operation, key, lastException);
}
public List<PartETag> waitForAllPartUploads() throws IOException {
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload:" + ie, ie);
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
//there is no way of recovering so abort
//cancel all partUploads
for (ListenableFuture<PartETag> future : partETagsFutures) {
future.cancel(true);
}
//abort multipartupload
this.abort();
throw new IOException("Part upload failed in multi-part upload with " +
"id '" +uploadId + "':" + ee, ee);
}
//should not happen?
return null;
}