下面列出了怎么用com.amazonaws.services.s3.model.InitiateMultipartUploadResult的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void complete() {
InitiateMultipartUploadResult response = mock(InitiateMultipartUploadResult.class);
when(response.getUploadId()).thenReturn(UPLOAD_ID);
when(s3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(response);
ArgumentCaptor<CompleteMultipartUploadRequest> request = ArgumentCaptor
.forClass(CompleteMultipartUploadRequest.class);
@SuppressWarnings("unchecked")
List<PartETag> partETags = mock(List.class);
when(s3.completeMultipartUpload(request.capture())).thenReturn(null);
underTest.start();
underTest.complete(UPLOAD_ID, partETags);
assertThat(request.getValue().getBucketName(), is(BUCKET));
assertThat(request.getValue().getKey(), is(KEY));
assertThat(request.getValue().getUploadId(), is(UPLOAD_ID));
assertThat(request.getValue().getPartETags(), is(partETags));
}
@Test
public void testDoGetOutputStream() throws Exception {
InitiateMultipartUploadResult initResponse = mock( InitiateMultipartUploadResult.class );
when( initResponse.getUploadId() ).thenReturn( "foo" );
when( s3ServiceMock.initiateMultipartUpload( any() ) ).thenReturn( initResponse );
UploadPartResult uploadPartResult = mock( UploadPartResult.class );
PartETag tag = mock( PartETag.class );
when( s3ServiceMock.uploadPart( any() ) ).thenReturn( uploadPartResult );
when( uploadPartResult.getPartETag() ).thenReturn( tag );
assertNotNull( s3FileObjectBucketSpy.doGetOutputStream( false ) );
OutputStream out = s3FileObjectBucketSpy.doGetOutputStream( true );
assertNotNull( out );
out.write( new byte[ 1024 * 1024 * 6 ] ); // 6MB
out.close();
// check kettle.properties 's3.vfs.partSize' is less than [5MB, 6MB)
verify( s3ServiceMock, times( 2 ) ).uploadPart( any() );
verify( s3ServiceMock, atMost( 1 ) ).completeMultipartUpload( any() );
}
/**
* For the input file, initiate the upload and emit the UploadFileMetadata through the fileMetadataOutput,
* uploadMetadataOutput ports.
* @param tuple given tuple
*/
protected void processTuple(AbstractFileSplitter.FileMetadata tuple)
{
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
return;
}
String keyName = getKeyName(tuple.getFilePath());
String uploadId = "";
if (tuple.getNumberOfBlocks() > 1) {
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, keyName);
initRequest.setObjectMetadata(createObjectMetadata());
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
uploadId = initResponse.getUploadId();
}
UploadFileMetadata uploadFileMetadata = new UploadFileMetadata(tuple, uploadId, keyName);
fileMetadataOutput.emit(uploadFileMetadata);
uploadMetadataOutput.emit(uploadFileMetadata);
currentWindowRecoveryState.add(uploadFileMetadata);
}
@Test
public void testInitiateUpload()
{
InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
result.setUploadId(uploadId);
MockitoAnnotations.initMocks(this);
when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result);
when(fileMetadata.getFilePath()).thenReturn("/tmp/file1.txt");
when(fileMetadata.getNumberOfBlocks()).thenReturn(4);
S3InitiateFileUploadTest operator = new S3InitiateFileUploadTest();
operator.setBucketName("testbucket");
operator.setup(context);
CollectorTestSink<S3InitiateFileUploadOperator.UploadFileMetadata> fileSink = new CollectorTestSink<>();
CollectorTestSink<Object> tmp = (CollectorTestSink)fileSink;
operator.fileMetadataOutput.setSink(tmp);
operator.beginWindow(0);
operator.processTuple(fileMetadata);
operator.endWindow();
S3InitiateFileUploadOperator.UploadFileMetadata emitted = (S3InitiateFileUploadOperator.UploadFileMetadata)tmp.collectedTuples.get(0);
Assert.assertEquals("Upload ID :", uploadId, emitted.getUploadId());
}
@Test
public void testAtomicMpuAbort() throws Exception {
String key = "testAtomicMpuAbort";
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(BYTE_SOURCE.size());
client.putObject(containerName, key, BYTE_SOURCE.openStream(),
metadata);
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(containerName, key);
InitiateMultipartUploadResult initResponse =
client.initiateMultipartUpload(initRequest);
String uploadId = initResponse.getUploadId();
client.abortMultipartUpload(new AbortMultipartUploadRequest(
containerName, key, uploadId));
S3Object object = client.getObject(containerName, key);
assertThat(object.getObjectMetadata().getContentLength()).isEqualTo(
BYTE_SOURCE.size());
try (InputStream actual = object.getObjectContent();
InputStream expected = BYTE_SOURCE.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
}
@Test
public void testPartNumberMarker() throws Exception {
String blobName = "foo";
InitiateMultipartUploadResult result = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(containerName, blobName));
ListPartsRequest request = new ListPartsRequest(containerName,
blobName, result.getUploadId());
client.listParts(request.withPartNumberMarker(0));
try {
client.listParts(request.withPartNumberMarker(1));
Fail.failBecauseExceptionWasNotThrown(AmazonS3Exception.class);
} catch (AmazonS3Exception e) {
assertThat(e.getErrorCode()).isEqualTo("NotImplemented");
}
}
private String initiateMultipartUpload() throws IOException {
boolean operationSuccessful = false;
final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.object);
if (this.useRRS) {
request.setStorageClass(StorageClass.ReducedRedundancy);
} else {
request.setStorageClass(StorageClass.Standard);
}
try {
final InitiateMultipartUploadResult result = this.s3Client.initiateMultipartUpload(request);
operationSuccessful = true;
return result.getUploadId();
} catch (AmazonServiceException e) {
throw new IOException(StringUtils.stringifyException(e));
} finally {
if (!operationSuccessful) {
abortUpload();
}
}
}
@Test
public void start() {
ArgumentCaptor<InitiateMultipartUploadRequest> request = ArgumentCaptor
.forClass(InitiateMultipartUploadRequest.class);
InitiateMultipartUploadResult response = mock(InitiateMultipartUploadResult.class);
when(response.getUploadId()).thenReturn(UPLOAD_ID);
when(s3.initiateMultipartUpload(request.capture())).thenReturn(response);
String result = underTest.start();
assertThat(result, is(UPLOAD_ID));
assertThat(request.getValue().getBucketName(), is(BUCKET));
assertThat(request.getValue().getKey(), is(KEY));
}
private AmazonS3Client getMockClient() {
AmazonS3Client mockClient = spy(AmazonS3Client.class);
UploadPartResult uploadResult = new UploadPartResult();
uploadResult.setETag("foo");
doReturn(uploadResult).when(mockClient).uploadPart(any(UploadPartRequest.class));
InitiateMultipartUploadResult initUploadResult = new InitiateMultipartUploadResult();
initUploadResult.setUploadId("123");
doReturn(initUploadResult).when(mockClient)
.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
return mockClient;
}
S3WritableByteChannel(AmazonS3 amazonS3, S3ResourceId path, String contentType, S3Options options)
throws IOException {
this.amazonS3 = checkNotNull(amazonS3, "amazonS3");
this.options = checkNotNull(options);
this.path = checkNotNull(path, "path");
checkArgument(
atMostOne(
options.getSSECustomerKey() != null,
options.getSSEAlgorithm() != null,
options.getSSEAwsKeyManagementParams() != null),
"Either SSECustomerKey (SSE-C) or SSEAlgorithm (SSE-S3)"
+ " or SSEAwsKeyManagementParams (SSE-KMS) must not be set at the same time.");
// Amazon S3 API docs: Each part must be at least 5 MB in size, except the last part.
checkArgument(
options.getS3UploadBufferSizeBytes()
>= S3UploadBufferSizeBytesFactory.MINIMUM_UPLOAD_BUFFER_SIZE_BYTES,
"S3UploadBufferSizeBytes must be at least %s bytes",
S3UploadBufferSizeBytesFactory.MINIMUM_UPLOAD_BUFFER_SIZE_BYTES);
this.uploadBuffer = ByteBuffer.allocate(options.getS3UploadBufferSizeBytes());
eTags = new ArrayList<>();
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentType(contentType);
if (options.getSSEAlgorithm() != null) {
objectMetadata.setSSEAlgorithm(options.getSSEAlgorithm());
}
InitiateMultipartUploadRequest request =
new InitiateMultipartUploadRequest(path.getBucket(), path.getKey())
.withStorageClass(options.getS3StorageClass())
.withObjectMetadata(objectMetadata);
request.setSSECustomerKey(options.getSSECustomerKey());
request.setSSEAwsKeyManagementParams(options.getSSEAwsKeyManagementParams());
InitiateMultipartUploadResult result;
try {
result = amazonS3.initiateMultipartUpload(request);
} catch (AmazonClientException e) {
throw new IOException(e);
}
uploadId = result.getUploadId();
}
@Override
public Optional<String> initiateMultipart(String s3Key) throws ModelDBException {
// Validate bucket
Boolean exist = doesBucketExist(bucketName);
if (!exist) {
throw new ModelDBException("Bucket does not exists", io.grpc.Status.Code.INTERNAL);
}
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, s3Key);
InitiateMultipartUploadResult result =
s3Client.initiateMultipartUpload(initiateMultipartUploadRequest);
return Optional.ofNullable(result.getUploadId());
}
public static void uploadFile(
final File file,
final Artifact artifact,
final CompressionType compressionType,
final EncryptionKey encryptionKey,
final AmazonS3 amazonS3,
final BuildListener listener) throws IOException {
LoggingHelper.log(listener, "Uploading artifact: " + artifact + ", file: " + file);
final String bucketName = artifact.getLocation().getS3Location().getBucketName();
final String objectKey = artifact.getLocation().getS3Location().getObjectKey();
final List<PartETag> partETags = new ArrayList<>();
final InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
bucketName,
objectKey,
createObjectMetadata(compressionType))
.withSSEAwsKeyManagementParams(toSSEAwsKeyManagementParams(encryptionKey));
final InitiateMultipartUploadResult initiateMultipartUploadResult
= amazonS3.initiateMultipartUpload(initiateMultipartUploadRequest);
final long contentLength = file.length();
long filePosition = 0;
long partSize = 5 * 1024 * 1024; // Set part size to 5 MB
for (int i = 1; filePosition < contentLength; i++) {
partSize = Math.min(partSize, (contentLength - filePosition));
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(objectKey)
.withUploadId(initiateMultipartUploadResult.getUploadId())
.withPartNumber(i)
.withFileOffset(filePosition)
.withFile(file)
.withPartSize(partSize);
partETags.add(amazonS3.uploadPart(uploadPartRequest).getPartETag());
filePosition += partSize;
}
final CompleteMultipartUploadRequest completeMultipartUpload
= new CompleteMultipartUploadRequest(
bucketName,
objectKey,
initiateMultipartUploadResult.getUploadId(),
partETags);
amazonS3.completeMultipartUpload(completeMultipartUpload);
LoggingHelper.log(listener, "Upload successful");
}
private void createKey(long counter) throws Exception {
timer.time(() -> {
if (multiPart) {
final String keyName = generateObjectName(counter);
final InitiateMultipartUploadRequest initiateRequest =
new InitiateMultipartUploadRequest(bucketName, keyName);
final InitiateMultipartUploadResult initiateMultipartUploadResult =
s3.initiateMultipartUpload(initiateRequest);
final String uploadId = initiateMultipartUploadResult.getUploadId();
List<PartETag> parts = new ArrayList<>();
for (int i = 1; i <= numberOfParts; i++) {
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(keyName)
.withPartNumber(i)
.withLastPart(i == numberOfParts)
.withUploadId(uploadId)
.withPartSize(fileSize)
.withInputStream(new ByteArrayInputStream(content.getBytes(
StandardCharsets.UTF_8)));
final UploadPartResult uploadPartResult =
s3.uploadPart(uploadPartRequest);
parts.add(uploadPartResult.getPartETag());
}
s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucketName, keyName, uploadId,
parts));
} else {
s3.putObject(bucketName, generateObjectName(counter),
content);
}
return null;
});
}
public static PendingUpload multipartUpload(
AmazonS3 client, File localFile, String partition,
String bucket, String key, long uploadPartSize) {
InitiateMultipartUploadResult initiate = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(bucket, key));
String uploadId = initiate.getUploadId();
boolean threw = true;
try {
Map<Integer, String> etags = Maps.newLinkedHashMap();
long offset = 0;
long numParts = (localFile.length() / uploadPartSize +
((localFile.length() % uploadPartSize) > 0 ? 1 : 0));
Preconditions.checkArgument(numParts > 0,
"Cannot upload 0 byte file: " + localFile);
for (int partNumber = 1; partNumber <= numParts; partNumber += 1) {
long size = Math.min(localFile.length() - offset, uploadPartSize);
UploadPartRequest part = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withPartNumber(partNumber)
.withUploadId(uploadId)
.withFile(localFile)
.withFileOffset(offset)
.withPartSize(size)
.withLastPart(partNumber == numParts);
UploadPartResult partResult = client.uploadPart(part);
PartETag etag = partResult.getPartETag();
etags.put(etag.getPartNumber(), etag.getETag());
offset += uploadPartSize;
}
PendingUpload pending = new PendingUpload(
partition, bucket, key, uploadId, etags);
threw = false;
return pending;
} finally {
if (threw) {
try {
client.abortMultipartUpload(
new AbortMultipartUploadRequest(bucket, key, uploadId));
} catch (AmazonClientException e) {
LOG.error("Failed to abort multi-part upload", e);
}
}
}
}
private static InitiateMultipartUploadResult newResult(
InitiateMultipartUploadRequest request, String uploadId) {
InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
result.setUploadId(uploadId);
return result;
}
@Test(expected = TransportException.class)
public void testAmazonClientException()
throws TransportException, IllegalStateException, IOException {
/*
* Create mock client, requets, and replies
*/
AmazonS3Client mockClient = mock(AmazonS3Client.class);
UploadPartResult uploadResult = new UploadPartResult();
uploadResult.setETag("foo");
doThrow(new AmazonClientException("expected")).when(mockClient)
.uploadPart(any(UploadPartRequest.class));
InitiateMultipartUploadResult initUploadResult = new InitiateMultipartUploadResult();
initUploadResult.setUploadId("123");
doReturn(initUploadResult).when(mockClient)
.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
/*
* Fill buffer with mock data
*/
S3TransportBuffer buffer = new S3TransportBuffer(1000, false, new S3TransportSerializer());
InternalEvent mockIevent = mock(InternalEvent.class);
doReturn("foo").when(mockIevent).getSerialized();
/*
* Create transport
*/
Map<String, MultiPartUpload> multiPartUploads = new HashMap<String, MultiPartUpload>(0);
S3Transport transport =
new S3Transport(mockClient, "bucket", "basepath", false, multiPartUploads);
/*
* Do actual test
*/
buffer.add(mockIevent);
LinkedHashMap<String, String> partitions = new LinkedHashMap<String, String>();
partitions.put(S3Transport.FILENAME_KEY, "a_filename");
ArgumentCaptor<UploadPartRequest> argument = ArgumentCaptor.forClass(UploadPartRequest.class);
try {
transport.sendBatch(buffer, partitions, new TestContext());
} catch (Exception e) {
assertEquals(e.getCause().getClass(), AmazonClientException.class);
throw e;
}
}
private void testMultipartCopy(S3Options options) {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(options);
S3ResourceId sourcePath = S3ResourceId.fromUri("s3://bucket/from");
S3ResourceId destinationPath = S3ResourceId.fromUri("s3://bucket/to");
InitiateMultipartUploadResult initiateMultipartUploadResult =
new InitiateMultipartUploadResult();
initiateMultipartUploadResult.setUploadId("upload-id");
if (getSSECustomerKeyMd5(options) != null) {
initiateMultipartUploadResult.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
}
when(s3FileSystem
.getAmazonS3Client()
.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)))
.thenReturn(initiateMultipartUploadResult);
assertEquals(
getSSECustomerKeyMd5(options),
s3FileSystem
.getAmazonS3Client()
.initiateMultipartUpload(
new InitiateMultipartUploadRequest(
destinationPath.getBucket(), destinationPath.getKey()))
.getSSECustomerKeyMd5());
ObjectMetadata sourceObjectMetadata = new ObjectMetadata();
sourceObjectMetadata.setContentLength((long) (options.getS3UploadBufferSizeBytes() * 1.5));
sourceObjectMetadata.setContentEncoding("read-seek-efficient");
if (getSSECustomerKeyMd5(options) != null) {
sourceObjectMetadata.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
}
assertGetObjectMetadata(
s3FileSystem,
createObjectMetadataRequest(sourcePath, options),
options,
sourceObjectMetadata);
CopyPartResult copyPartResult1 = new CopyPartResult();
copyPartResult1.setETag("etag-1");
CopyPartResult copyPartResult2 = new CopyPartResult();
copyPartResult1.setETag("etag-2");
if (getSSECustomerKeyMd5(options) != null) {
copyPartResult1.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
copyPartResult2.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
}
CopyPartRequest copyPartRequest = new CopyPartRequest();
copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
when(s3FileSystem.getAmazonS3Client().copyPart(any(CopyPartRequest.class)))
.thenReturn(copyPartResult1)
.thenReturn(copyPartResult2);
assertEquals(
getSSECustomerKeyMd5(options),
s3FileSystem.getAmazonS3Client().copyPart(copyPartRequest).getSSECustomerKeyMd5());
s3FileSystem.multipartCopy(sourcePath, destinationPath, sourceObjectMetadata);
verify(s3FileSystem.getAmazonS3Client(), times(1))
.completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
}
@Override
public InitiateMultipartUploadResult initiateMultipartUpload(final InitiateMultipartUploadRequest request) {
encrypter.addEncryption(request);
return super.initiateMultipartUpload(request);
}
@Test
public void testS3OutputModule() throws Exception
{
InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
result.setUploadId(uploadId);
PutObjectResult objResult = new PutObjectResult();
objResult.setETag("SuccessFullyUploaded");
UploadPartResult partResult = new UploadPartResult();
partResult.setPartNumber(1);
partResult.setETag("SuccessFullyPartUploaded");
MockitoAnnotations.initMocks(this);
when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result);
when(client.putObject(any(PutObjectRequest.class))).thenReturn(objResult);
when(client.uploadPart(any(UploadPartRequest.class))).thenReturn(partResult);
when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(completeMultiPart());
Application app = new S3OutputModuleMockTest.Application();
Configuration conf = new Configuration();
conf.set("dt.operator.HDFSInputModule.prop.files", inputDir);
conf.set("dt.operator.HDFSInputModule.prop.blockSize", "10");
conf.set("dt.operator.HDFSInputModule.prop.blocksThreshold", "1");
conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","20");
conf.set("dt.operator.S3OutputModule.prop.accessKey", "accessKey");
conf.set("dt.operator.S3OutputModule.prop.secretAccessKey", "secretKey");
conf.set("dt.operator.S3OutputModule.prop.bucketName", "bucketKey");
conf.set("dt.operator.S3OutputModule.prop.outputDirectoryPath", outputDir);
Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
final Path outputFilePath = new Path(outDir.toString() + File.separator + FILE);
final FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(true);
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return fs.exists(outputFilePath);
}
});
lc.run(10000);
Assert.assertTrue("output file exist", fs.exists(outputFilePath));
}
/** Unsupported Operation. */
@Override public InitiateMultipartUploadResult initiateMultipartUpload(
InitiateMultipartUploadRequest req) throws SdkClientException {
throw new UnsupportedOperationException("Operation not supported");
}
@Test
public void testMultipartCopy() throws Exception {
// B2 requires two parts to issue an MPU
assumeTrue(!blobStoreType.equals("b2"));
String sourceBlobName = "testMultipartCopy-source";
String targetBlobName = "testMultipartCopy-target";
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(BYTE_SOURCE.size());
client.putObject(containerName, sourceBlobName,
BYTE_SOURCE.openStream(), metadata);
InitiateMultipartUploadRequest initiateRequest =
new InitiateMultipartUploadRequest(containerName,
targetBlobName);
InitiateMultipartUploadResult initResult =
client.initiateMultipartUpload(initiateRequest);
String uploadId = initResult.getUploadId();
CopyPartRequest copyRequest = new CopyPartRequest()
.withDestinationBucketName(containerName)
.withDestinationKey(targetBlobName)
.withSourceBucketName(containerName)
.withSourceKey(sourceBlobName)
.withUploadId(uploadId)
.withFirstByte(0L)
.withLastByte(BYTE_SOURCE.size() - 1)
.withPartNumber(1);
CopyPartResult copyPartResult = client.copyPart(copyRequest);
CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(
containerName, targetBlobName, uploadId,
ImmutableList.of(copyPartResult.getPartETag()));
client.completeMultipartUpload(completeRequest);
S3Object object = client.getObject(containerName, targetBlobName);
assertThat(object.getObjectMetadata().getContentLength()).isEqualTo(
BYTE_SOURCE.size());
try (InputStream actual = object.getObjectContent();
InputStream expected = BYTE_SOURCE.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
}
@Test
public void testBigMultipartUpload() throws Exception {
String key = "multipart-upload";
long partSize = MINIMUM_MULTIPART_SIZE;
long size = partSize + 1;
ByteSource byteSource = TestUtils.randomByteSource().slice(0, size);
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(containerName, key);
InitiateMultipartUploadResult initResponse =
client.initiateMultipartUpload(initRequest);
String uploadId = initResponse.getUploadId();
ByteSource byteSource1 = byteSource.slice(0, partSize);
UploadPartRequest uploadRequest1 = new UploadPartRequest()
.withBucketName(containerName)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(1)
.withInputStream(byteSource1.openStream())
.withPartSize(byteSource1.size());
uploadRequest1.getRequestClientOptions().setReadLimit(
(int) byteSource1.size());
UploadPartResult uploadPartResult1 = client.uploadPart(uploadRequest1);
ByteSource byteSource2 = byteSource.slice(partSize, size - partSize);
UploadPartRequest uploadRequest2 = new UploadPartRequest()
.withBucketName(containerName)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(2)
.withInputStream(byteSource2.openStream())
.withPartSize(byteSource2.size());
uploadRequest2.getRequestClientOptions().setReadLimit(
(int) byteSource2.size());
UploadPartResult uploadPartResult2 = client.uploadPart(uploadRequest2);
CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(
containerName, key, uploadId,
ImmutableList.of(
uploadPartResult1.getPartETag(),
uploadPartResult2.getPartETag()));
client.completeMultipartUpload(completeRequest);
S3Object object = client.getObject(containerName, key);
assertThat(object.getObjectMetadata().getContentLength()).isEqualTo(
size);
try (InputStream actual = object.getObjectContent();
InputStream expected = byteSource.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
}
@Test
public void testMultipartUpload() throws Exception {
String blobName = "multipart-upload";
String cacheControl = "max-age=3600";
String contentDisposition = "attachment; filename=new.jpg";
String contentEncoding = "gzip";
String contentLanguage = "fr";
String contentType = "audio/mp4";
Map<String, String> userMetadata = ImmutableMap.of(
"key1", "value1",
"key2", "value2");
ObjectMetadata metadata = new ObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
metadata.setCacheControl(cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
metadata.setContentDisposition(contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
metadata.setContentEncoding(contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
metadata.setContentLanguage(contentLanguage);
}
metadata.setContentType(contentType);
// TODO: expires
metadata.setUserMetadata(userMetadata);
InitiateMultipartUploadResult result = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(containerName, blobName,
metadata));
ByteSource byteSource = TestUtils.randomByteSource().slice(
0, MINIMUM_MULTIPART_SIZE + 1);
ByteSource byteSource1 = byteSource.slice(0, MINIMUM_MULTIPART_SIZE);
ByteSource byteSource2 = byteSource.slice(MINIMUM_MULTIPART_SIZE, 1);
UploadPartResult part1 = client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(1)
.withPartSize(byteSource1.size())
.withInputStream(byteSource1.openStream()));
UploadPartResult part2 = client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(2)
.withPartSize(byteSource2.size())
.withInputStream(byteSource2.openStream()));
client.completeMultipartUpload(new CompleteMultipartUploadRequest(
containerName, blobName, result.getUploadId(),
ImmutableList.of(part1.getPartETag(), part2.getPartETag())));
ObjectListing listing = client.listObjects(containerName);
assertThat(listing.getObjectSummaries()).hasSize(1);
S3Object object = client.getObject(containerName, blobName);
try (InputStream actual = object.getObjectContent();
InputStream expected = byteSource.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
ObjectMetadata newContentMetadata = object.getObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
assertThat(newContentMetadata.getCacheControl()).isEqualTo(
cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentDisposition()).isEqualTo(
contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentEncoding()).isEqualTo(
contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentLanguage()).isEqualTo(
contentLanguage);
}
assertThat(newContentMetadata.getContentType()).isEqualTo(
contentType);
// TODO: expires
assertThat(newContentMetadata.getUserMetadata()).isEqualTo(
userMetadata);
}
@Test
public void testMultipartUploadAbort() throws Exception {
String blobName = "multipart-upload-abort";
ByteSource byteSource = TestUtils.randomByteSource().slice(
0, MINIMUM_MULTIPART_SIZE);
InitiateMultipartUploadResult result = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(containerName, blobName));
// TODO: google-cloud-storage and openstack-swift cannot list multipart
// uploads
MultipartUploadListing multipartListing = client.listMultipartUploads(
new ListMultipartUploadsRequest(containerName));
if (blobStoreType.equals("azureblob")) {
// Azure does not create a manifest during initiate multi-part
// upload. Instead the first part creates this.
assertThat(multipartListing.getMultipartUploads()).isEmpty();
} else {
assertThat(multipartListing.getMultipartUploads()).hasSize(1);
}
PartListing partListing = client.listParts(new ListPartsRequest(
containerName, blobName, result.getUploadId()));
assertThat(partListing.getParts()).isEmpty();
client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(1)
.withPartSize(byteSource.size())
.withInputStream(byteSource.openStream()));
multipartListing = client.listMultipartUploads(
new ListMultipartUploadsRequest(containerName));
assertThat(multipartListing.getMultipartUploads()).hasSize(1);
partListing = client.listParts(new ListPartsRequest(
containerName, blobName, result.getUploadId()));
assertThat(partListing.getParts()).hasSize(1);
client.abortMultipartUpload(new AbortMultipartUploadRequest(
containerName, blobName, result.getUploadId()));
multipartListing = client.listMultipartUploads(
new ListMultipartUploadsRequest(containerName));
if (blobStoreType.equals("azureblob")) {
// Azure does not support explicit abort. It automatically
// removes incomplete multi-part uploads after 7 days.
assertThat(multipartListing.getMultipartUploads()).hasSize(1);
} else {
assertThat(multipartListing.getMultipartUploads()).isEmpty();
}
ObjectListing listing = client.listObjects(containerName);
assertThat(listing.getObjectSummaries()).isEmpty();
}
private void multipartUpload(
String key,
File file,
ObjectMetadata objectMetadata,
Optional<StorageClass> maybeStorageClass
)
throws Exception {
List<PartETag> partETags = new ArrayList<>();
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(
bucketName,
key,
objectMetadata
);
if (maybeStorageClass.isPresent()) {
initRequest.setStorageClass(maybeStorageClass.get());
}
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(
initRequest
);
long contentLength = file.length();
long partSize = configuration.getUploadPartSize();
try {
long filePosition = 0;
for (int i = 1; filePosition < contentLength; i++) {
partSize = Math.min(partSize, (contentLength - filePosition));
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(key)
.withUploadId(initResponse.getUploadId())
.withPartNumber(i)
.withFileOffset(filePosition)
.withFile(file)
.withPartSize(partSize);
partETags.add(s3Client.uploadPart(uploadRequest).getPartETag());
filePosition += partSize;
}
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
bucketName,
key,
initResponse.getUploadId(),
partETags
);
s3Client.completeMultipartUpload(completeRequest);
} catch (Exception e) {
s3Client.abortMultipartUpload(
new AbortMultipartUploadRequest(bucketName, key, initResponse.getUploadId())
);
throw new RuntimeException(e);
}
}
@Override
public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request)
throws AmazonClientException, AmazonServiceException {
// TODO Auto-generated method stub
return null;
}
public void multipartUpload(String bucketName, String keyName, String filePath) throws IOException {
//Create a list of UploadPartResponse objects. You get one of these for each part upload.
List<PartETag> partETags = new ArrayList<PartETag>();
// Step 1: Initialize.
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, keyName);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
File file = new File(filePath);
long contentLength = file.length();
long partSize = 5 * 1024 * 1024; // Set part size to 1kb.
try {
// Step 2: Upload parts.
long filePosition = 0;
for (int i = 1; filePosition < contentLength; i++) {
// Last part can be less than 5 MB. Adjust part size.
partSize = Math.min(partSize, (contentLength - filePosition));
// Create request to upload a part.
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName).withKey(keyName)
.withUploadId(initResponse.getUploadId()).withPartNumber(i)
.withFileOffset(filePosition)
.withFile(file)
.withPartSize(partSize);
// Upload part and add response to our list.
partETags.add(s3Client.uploadPart(uploadRequest).getPartETag());
filePosition += partSize;
}
// Step 3: Complete.
CompleteMultipartUploadRequest compRequest =
new CompleteMultipartUploadRequest(bucketName, keyName, initResponse.getUploadId(), partETags);
s3Client.completeMultipartUpload(compRequest);
} catch (Exception e) {
s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, keyName, initResponse.getUploadId()));
e.printStackTrace();
}
}