下面列出了怎么用com.amazonaws.services.s3.model.UploadPartResult的API类实例代码及写法,或者点击链接到github查看源代码。
PartETag upload(String uploadId, S3Part part) {
Object[] logParams = new Object[] { part.getSize(), part.getNumber(), bucket, key };
log.debug("Uploading {} bytes for part {} to s3://{}/{}.", logParams);
UploadPartRequest request = new UploadPartRequest()
.withUploadId(uploadId)
.withBucketName(bucket)
.withKey(key)
.withPartNumber(part.getNumber())
.withPartSize(part.getSize())
.withMD5Digest(part.getMd5())
.withInputStream(part.getInputStream());
UploadPartResult result = s3.uploadPart(request);
log.debug("Uploaded {} bytes for part {} to s3://{}/{}.", logParams);
bytes += part.getSize();
return result.getPartETag();
}
@Test
public void upload() {
ArgumentCaptor<UploadPartRequest> request = ArgumentCaptor.forClass(UploadPartRequest.class);
UploadPartResult response = mock(UploadPartResult.class);
PartETag partETag = mock(PartETag.class);
when(response.getPartETag()).thenReturn(partETag);
when(s3.uploadPart(request.capture())).thenReturn(response);
InputStream inputStream = mock(InputStream.class);
S3Part part = new S3Part(1, 2, "md5", inputStream);
PartETag result = underTest.upload(UPLOAD_ID, part);
assertThat(result, is(partETag));
assertThat(request.getValue().getBucketName(), is(BUCKET));
assertThat(request.getValue().getKey(), is(KEY));
assertThat(request.getValue().getPartNumber(), is(1));
assertThat(request.getValue().getPartSize(), is(2L));
assertThat(request.getValue().getMd5Digest(), is("md5"));
assertThat(request.getValue().getInputStream(), is(inputStream));
}
@Test
public void testDoGetOutputStream() throws Exception {
InitiateMultipartUploadResult initResponse = mock( InitiateMultipartUploadResult.class );
when( initResponse.getUploadId() ).thenReturn( "foo" );
when( s3ServiceMock.initiateMultipartUpload( any() ) ).thenReturn( initResponse );
UploadPartResult uploadPartResult = mock( UploadPartResult.class );
PartETag tag = mock( PartETag.class );
when( s3ServiceMock.uploadPart( any() ) ).thenReturn( uploadPartResult );
when( uploadPartResult.getPartETag() ).thenReturn( tag );
assertNotNull( s3FileObjectBucketSpy.doGetOutputStream( false ) );
OutputStream out = s3FileObjectBucketSpy.doGetOutputStream( true );
assertNotNull( out );
out.write( new byte[ 1024 * 1024 * 6 ] ); // 6MB
out.close();
// check kettle.properties 's3.vfs.partSize' is less than [5MB, 6MB)
verify( s3ServiceMock, times( 2 ) ).uploadPart( any() );
verify( s3ServiceMock, atMost( 1 ) ).completeMultipartUpload( any() );
}
private void flush() throws IOException {
uploadBuffer.flip();
ByteArrayInputStream inputStream = new ByteArrayInputStream(uploadBuffer.array());
UploadPartRequest request =
new UploadPartRequest()
.withBucketName(path.getBucket())
.withKey(path.getKey())
.withUploadId(uploadId)
.withPartNumber(partNumber++)
.withPartSize(uploadBuffer.remaining())
.withMD5Digest(Base64.encodeAsString(md5.digest()))
.withInputStream(inputStream);
request.setSSECustomerKey(options.getSSECustomerKey());
UploadPartResult result;
try {
result = amazonS3.uploadPart(request);
} catch (AmazonClientException e) {
throw new IOException(e);
}
uploadBuffer.clear();
md5.reset();
eTags.add(result.getPartETag());
}
@Override
public UploadPartResult call() throws Exception {
try {
return this.amazonS3.uploadPart(new UploadPartRequest()
.withBucketName(this.bucketName).withKey(this.key)
.withUploadId(this.uploadId)
.withInputStream(new ByteArrayInputStream(this.content))
.withPartNumber(this.partNumber).withLastPart(this.last)
.withPartSize(this.contentLength));
}
finally {
// Release the memory, as the callable may still live inside the
// CompletionService which would cause
// an exhaustive memory usage
this.content = null;
}
}
@Override
public void run() {
try {
final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, file.getInputFile(), file.getPos());
future.complete(new PartETag(result.getPartNumber(), result.getETag()));
file.release();
}
catch (Throwable t) {
future.completeExceptionally(t);
}
}
@Override
public void run() {
try {
final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, file.getInputFile(), file.getPos());
future.complete(new PartETag(result.getPartNumber(), result.getETag()));
file.release();
}
catch (Throwable t) {
future.completeExceptionally(t);
}
}
private static UploadPartResult newResult(UploadPartRequest request,
String etag) {
UploadPartResult result = new UploadPartResult();
result.setPartNumber(request.getPartNumber());
result.setETag(etag);
return result;
}
private AmazonS3Client getMockClient() {
AmazonS3Client mockClient = spy(AmazonS3Client.class);
UploadPartResult uploadResult = new UploadPartResult();
uploadResult.setETag("foo");
doReturn(uploadResult).when(mockClient).uploadPart(any(UploadPartRequest.class));
InitiateMultipartUploadResult initUploadResult = new InitiateMultipartUploadResult();
initUploadResult.setUploadId("123");
doReturn(initUploadResult).when(mockClient)
.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
return mockClient;
}
/**
* Upload part of a multi-partition file.
* <i>Important: this call does not close any input stream in the request.</i>
* @param request request
* @return the result of the operation
* @throws AmazonClientException on problems
*/
public UploadPartResult uploadPart(UploadPartRequest request)
throws AmazonClientException {
try {
UploadPartResult uploadPartResult = mClient.uploadPart(request);
return uploadPartResult;
} catch (AmazonClientException e) {
throw e;
}
}
@Override
public void run() {
try {
final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, file.getInputFile(), file.getPos());
future.complete(new PartETag(result.getPartNumber(), result.getETag()));
file.release();
}
catch (Throwable t) {
future.completeExceptionally(t);
}
}
private List<PartETag> getMultiPartsUploadResults()
throws ExecutionException, InterruptedException {
List<PartETag> result = new ArrayList<>(this.partNumberCounter);
for (int i = 0; i < this.partNumberCounter; i++) {
Future<UploadPartResult> uploadPartResultFuture = this.completionService
.take();
result.add(uploadPartResultFuture.get().getPartETag());
}
return result;
}
private void uploadPartAndFlushBuffer() throws IOException {
boolean operationSuccessful = false;
if (this.uploadId == null) {
this.uploadId = initiateMultipartUpload();
}
try {
if (this.partNumber >= MAX_PART_NUMBER) {
throw new IOException("Cannot upload any more data: maximum part number reached");
}
final InputStream inputStream = new InternalUploadInputStream(this.buf, this.bytesWritten);
final UploadPartRequest request = new UploadPartRequest();
request.setBucketName(this.bucket);
request.setKey(this.object);
request.setInputStream(inputStream);
request.setUploadId(this.uploadId);
request.setPartSize(this.bytesWritten);
request.setPartNumber(this.partNumber++);
final UploadPartResult result = this.s3Client.uploadPart(request);
this.partETags.add(result.getPartETag());
this.bytesWritten = 0;
operationSuccessful = true;
} catch (AmazonServiceException e) {
throw new IOException(StringUtils.stringifyException(e));
} finally {
if (!operationSuccessful) {
abortUpload();
}
}
}
@Override
public void close() throws IOException {
UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(key)
.withPartNumber(partNumber)
.withPartSize(outputStream.size())
.withUploadId(multipartUpload.getUploadId())
.withInputStream(new ByteArrayInputStream(outputStream.toByteArray()));
UploadPartResult uploadPartResult = client.uploadPart(uploadPartRequest);
List<PartETag> partETags;
try {
partETags = CompletableFutures.allAsList(pendingUploads).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
partETags.add(uploadPartResult.getPartETag());
client.completeMultipartUpload(
new CompleteMultipartUploadRequest(
bucketName,
key,
multipartUpload.getUploadId(),
partETags)
);
super.close();
}
private void createKey(long counter) throws Exception {
timer.time(() -> {
if (multiPart) {
final String keyName = generateObjectName(counter);
final InitiateMultipartUploadRequest initiateRequest =
new InitiateMultipartUploadRequest(bucketName, keyName);
final InitiateMultipartUploadResult initiateMultipartUploadResult =
s3.initiateMultipartUpload(initiateRequest);
final String uploadId = initiateMultipartUploadResult.getUploadId();
List<PartETag> parts = new ArrayList<>();
for (int i = 1; i <= numberOfParts; i++) {
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(keyName)
.withPartNumber(i)
.withLastPart(i == numberOfParts)
.withUploadId(uploadId)
.withPartSize(fileSize)
.withInputStream(new ByteArrayInputStream(content.getBytes(
StandardCharsets.UTF_8)));
final UploadPartResult uploadPartResult =
s3.uploadPart(uploadPartRequest);
parts.add(uploadPartResult.getPartETag());
}
s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucketName, keyName, uploadId,
parts));
} else {
s3.putObject(bucketName, generateObjectName(counter),
content);
}
return null;
});
}
@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(length));
return storeAndGetUploadPartResult(key, partNumber, content);
}
@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
key, uploadId, partNumber, MathUtils.checkedDownCast(length), null, inputFile, 0L);
return s3accessHelper.uploadPart(uploadRequest);
}
@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(length));
return storeAndGetUploadPartResult(key, partNumber, content);
}
@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
key, uploadId, partNumber, MathUtils.checkedDownCast(length), null, inputFile, 0L);
return s3accessHelper.uploadPart(uploadRequest);
}
public static PendingUpload multipartUpload(
AmazonS3 client, File localFile, String partition,
String bucket, String key, long uploadPartSize) {
InitiateMultipartUploadResult initiate = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(bucket, key));
String uploadId = initiate.getUploadId();
boolean threw = true;
try {
Map<Integer, String> etags = Maps.newLinkedHashMap();
long offset = 0;
long numParts = (localFile.length() / uploadPartSize +
((localFile.length() % uploadPartSize) > 0 ? 1 : 0));
Preconditions.checkArgument(numParts > 0,
"Cannot upload 0 byte file: " + localFile);
for (int partNumber = 1; partNumber <= numParts; partNumber += 1) {
long size = Math.min(localFile.length() - offset, uploadPartSize);
UploadPartRequest part = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withPartNumber(partNumber)
.withUploadId(uploadId)
.withFile(localFile)
.withFileOffset(offset)
.withPartSize(size)
.withLastPart(partNumber == numParts);
UploadPartResult partResult = client.uploadPart(part);
PartETag etag = partResult.getPartETag();
etags.put(etag.getPartNumber(), etag.getETag());
offset += uploadPartSize;
}
PendingUpload pending = new PendingUpload(
partition, bucket, key, uploadId, etags);
threw = false;
return pending;
} finally {
if (threw) {
try {
client.abortMultipartUpload(
new AbortMultipartUploadRequest(bucket, key, uploadId));
} catch (AmazonClientException e) {
LOG.error("Failed to abort multi-part upload", e);
}
}
}
}
@Test
public void testCompressedBuffer() throws TransportException, IllegalStateException, IOException {
/*
* Create mock client, requets, and replies
*/
AmazonS3Client mockClient = getMockClient();
/*
* Capture the InputStream into a ByteArrayOutputStream before the Transport thread closes the
* InputStream and makes it unavailable for reading.
*/
ByteArrayOutputStream captured = new ByteArrayOutputStream();
Answer answer = new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
UploadPartRequest req = invocation.getArgumentAt(0, UploadPartRequest.class);
captured.write(req.getInputStream());
return new UploadPartResult();
}
};
Mockito.doAnswer(answer).when(mockClient).uploadPart(any(UploadPartRequest.class));
/*
* Fill buffer with mock data
*/
S3TransportBuffer buffer = new S3TransportBuffer(1000, true, new S3TransportSerializer());
InternalEvent mockIevent = mock(InternalEvent.class);
doReturn("foo").when(mockIevent).getSerialized();
/*
* Create transport
*/
Map<String, MultiPartUpload> multiPartUploads = new HashMap<String, MultiPartUpload>(0);
S3Transport transport =
new S3Transport(mockClient, "bucket", "basepath", true, multiPartUploads);
/*
* Do actual test
*/
buffer.add(mockIevent);
LinkedHashMap<String, String> partitions = new LinkedHashMap<String, String>();
partitions.put(S3Transport.FILENAME_KEY, "a_filename");
ArgumentCaptor<UploadPartRequest> argument = ArgumentCaptor.forClass(UploadPartRequest.class);
buffer.close();
transport.sendBatch(buffer, partitions, new TestContext());
verify(mockClient).uploadPart(argument.capture());
/*
* Check results
*/
assertEquals("bucket", argument.getValue().getBucketName());
assertEquals("basepath/a_filename.bz2", argument.getValue().getKey());
assertEquals(1, argument.getValue().getPartNumber());
assertEquals(40, argument.getValue().getPartSize());
assertEquals("123", argument.getValue().getUploadId());
/*
* Convert the actual InputStream from the client into a ByteArrayOutputStream which can be read
* and verified.
*/
byte[] actualBytes = captured.toByteArray();
byte[] expectedBytes =
{66, 90, 104, 57, 49, 65, 89, 38, 83, 89, 118, -10, -77, -27, 0, 0, 0, -63, 0, 0, 16, 1, 0,
-96, 0, 48, -52, 12, -62, 12, 46, -28, -118, 112, -95, 32, -19, -19, 103, -54};
assertArrayEquals(expectedBytes, actualBytes);
}
@Test
public void testCompressed() throws TransportException, IllegalStateException, IOException {
/*
* Create mock client, requets, and replies
*/
AmazonS3Client mockClient = getMockClient();
/*
* Capture the InputStream into a ByteArrayOutputStream before the Transport thread closes the
* InputStream and makes it unavailable for reading.
*/
ByteArrayOutputStream captured = new ByteArrayOutputStream();
Answer answer = new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
UploadPartRequest req = invocation.getArgumentAt(0, UploadPartRequest.class);
captured.write(req.getInputStream());
return new UploadPartResult();
}
};
Mockito.doAnswer(answer).when(mockClient).uploadPart(any(UploadPartRequest.class));
/*
* Fill buffer with mock data
*/
S3TransportBuffer buffer = new S3TransportBuffer(1000, false, new S3TransportSerializer());
InternalEvent mockIevent = mock(InternalEvent.class);
doReturn("foo").when(mockIevent).getSerialized();
/*
* Create transport
*/
Map<String, MultiPartUpload> multiPartUploads = new HashMap<String, MultiPartUpload>(0);
S3Transport transport =
new S3Transport(mockClient, "bucket", "basepath", true, multiPartUploads);
/*
* Do actual test
*/
buffer.add(mockIevent);
LinkedHashMap<String, String> partitions = new LinkedHashMap<String, String>();
partitions.put(S3Transport.FILENAME_KEY, "a_filename");
ArgumentCaptor<UploadPartRequest> argument = ArgumentCaptor.forClass(UploadPartRequest.class);
buffer.close();
transport.sendBatch(buffer, partitions, new TestContext());
verify(mockClient).uploadPart(argument.capture());
/*
* Check results
*/
assertEquals("bucket", argument.getValue().getBucketName());
assertEquals("basepath/a_filename.bz2", argument.getValue().getKey());
assertEquals(1, argument.getValue().getPartNumber());
assertEquals(40, argument.getValue().getPartSize());
assertEquals("123", argument.getValue().getUploadId());
/*
* Convert the actual InputStream from the client into a ByteArrayOutputStream which can be read
* and verified.
*/
byte[] actualBytes = captured.toByteArray();
byte[] expectedBytes =
{66, 90, 104, 57, 49, 65, 89, 38, 83, 89, 118, -10, -77, -27, 0, 0, 0, -63, 0, 0, 16, 1, 0,
-96, 0, 48, -52, 12, -62, 12, 46, -28, -118, 112, -95, 32, -19, -19, 103, -54};
assertArrayEquals(expectedBytes, actualBytes);
}
@Test(expected = TransportException.class)
public void testAmazonClientException()
throws TransportException, IllegalStateException, IOException {
/*
* Create mock client, requets, and replies
*/
AmazonS3Client mockClient = mock(AmazonS3Client.class);
UploadPartResult uploadResult = new UploadPartResult();
uploadResult.setETag("foo");
doThrow(new AmazonClientException("expected")).when(mockClient)
.uploadPart(any(UploadPartRequest.class));
InitiateMultipartUploadResult initUploadResult = new InitiateMultipartUploadResult();
initUploadResult.setUploadId("123");
doReturn(initUploadResult).when(mockClient)
.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
/*
* Fill buffer with mock data
*/
S3TransportBuffer buffer = new S3TransportBuffer(1000, false, new S3TransportSerializer());
InternalEvent mockIevent = mock(InternalEvent.class);
doReturn("foo").when(mockIevent).getSerialized();
/*
* Create transport
*/
Map<String, MultiPartUpload> multiPartUploads = new HashMap<String, MultiPartUpload>(0);
S3Transport transport =
new S3Transport(mockClient, "bucket", "basepath", false, multiPartUploads);
/*
* Do actual test
*/
buffer.add(mockIevent);
LinkedHashMap<String, String> partitions = new LinkedHashMap<String, String>();
partitions.put(S3Transport.FILENAME_KEY, "a_filename");
ArgumentCaptor<UploadPartRequest> argument = ArgumentCaptor.forClass(UploadPartRequest.class);
try {
transport.sendBatch(buffer, partitions, new TestContext());
} catch (Exception e) {
assertEquals(e.getCause().getClass(), AmazonClientException.class);
throw e;
}
}
private void uploadMultiPart(final AmazonS3 s3,
final String bucket,
final String key,
final InputStream firstChunk,
final InputStream restOfContents)
throws IOException {
checkState(firstChunk.available() > 0);
String uploadId = null;
try {
InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key);
uploadId = s3.initiateMultipartUpload(initiateRequest).getUploadId();
log.debug("Starting multipart upload {} to key {} in bucket {}", uploadId, key, bucket);
List<UploadPartResult> results = new ArrayList<>();
for (int partNumber = 1; ; partNumber++) {
InputStream chunk = partNumber == 1 ? firstChunk : readChunk(restOfContents);
if (chunk.available() == 0) {
break;
}
else {
log.debug("Uploading chunk {} for {} of {} bytes", partNumber, uploadId, chunk.available());
UploadPartRequest part = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withInputStream(chunk)
.withPartSize(chunk.available());
results.add(s3.uploadPart(part));
}
}
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartETags(results);
s3.completeMultipartUpload(compRequest);
log.debug("Upload {} complete", uploadId);
uploadId = null;
}
finally {
if (uploadId != null) {
try {
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId));
}
catch(Exception e) {
log.error("Error aborting S3 multipart upload to bucket {} with key {}", bucket, key,
log.isDebugEnabled() ? e : null);
}
}
}
}
@Test
public void testS3OutputModule() throws Exception
{
InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
result.setUploadId(uploadId);
PutObjectResult objResult = new PutObjectResult();
objResult.setETag("SuccessFullyUploaded");
UploadPartResult partResult = new UploadPartResult();
partResult.setPartNumber(1);
partResult.setETag("SuccessFullyPartUploaded");
MockitoAnnotations.initMocks(this);
when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result);
when(client.putObject(any(PutObjectRequest.class))).thenReturn(objResult);
when(client.uploadPart(any(UploadPartRequest.class))).thenReturn(partResult);
when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(completeMultiPart());
Application app = new S3OutputModuleMockTest.Application();
Configuration conf = new Configuration();
conf.set("dt.operator.HDFSInputModule.prop.files", inputDir);
conf.set("dt.operator.HDFSInputModule.prop.blockSize", "10");
conf.set("dt.operator.HDFSInputModule.prop.blocksThreshold", "1");
conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","20");
conf.set("dt.operator.S3OutputModule.prop.accessKey", "accessKey");
conf.set("dt.operator.S3OutputModule.prop.secretAccessKey", "secretKey");
conf.set("dt.operator.S3OutputModule.prop.bucketName", "bucketKey");
conf.set("dt.operator.S3OutputModule.prop.outputDirectoryPath", outputDir);
Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
final Path outputFilePath = new Path(outDir.toString() + File.separator + FILE);
final FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(true);
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return fs.exists(outputFilePath);
}
});
lc.run(10000);
Assert.assertTrue("output file exist", fs.exists(outputFilePath));
}
/** Unsupported Operation. */
@Override public UploadPartResult uploadPart(UploadPartRequest req) throws SdkClientException {
throw new UnsupportedOperationException("Operation not supported");
}
@Test
public void testBigMultipartUpload() throws Exception {
String key = "multipart-upload";
long partSize = MINIMUM_MULTIPART_SIZE;
long size = partSize + 1;
ByteSource byteSource = TestUtils.randomByteSource().slice(0, size);
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(containerName, key);
InitiateMultipartUploadResult initResponse =
client.initiateMultipartUpload(initRequest);
String uploadId = initResponse.getUploadId();
ByteSource byteSource1 = byteSource.slice(0, partSize);
UploadPartRequest uploadRequest1 = new UploadPartRequest()
.withBucketName(containerName)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(1)
.withInputStream(byteSource1.openStream())
.withPartSize(byteSource1.size());
uploadRequest1.getRequestClientOptions().setReadLimit(
(int) byteSource1.size());
UploadPartResult uploadPartResult1 = client.uploadPart(uploadRequest1);
ByteSource byteSource2 = byteSource.slice(partSize, size - partSize);
UploadPartRequest uploadRequest2 = new UploadPartRequest()
.withBucketName(containerName)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(2)
.withInputStream(byteSource2.openStream())
.withPartSize(byteSource2.size());
uploadRequest2.getRequestClientOptions().setReadLimit(
(int) byteSource2.size());
UploadPartResult uploadPartResult2 = client.uploadPart(uploadRequest2);
CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(
containerName, key, uploadId,
ImmutableList.of(
uploadPartResult1.getPartETag(),
uploadPartResult2.getPartETag()));
client.completeMultipartUpload(completeRequest);
S3Object object = client.getObject(containerName, key);
assertThat(object.getObjectMetadata().getContentLength()).isEqualTo(
size);
try (InputStream actual = object.getObjectContent();
InputStream expected = byteSource.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
}
@Test
public void testMultipartUpload() throws Exception {
String blobName = "multipart-upload";
String cacheControl = "max-age=3600";
String contentDisposition = "attachment; filename=new.jpg";
String contentEncoding = "gzip";
String contentLanguage = "fr";
String contentType = "audio/mp4";
Map<String, String> userMetadata = ImmutableMap.of(
"key1", "value1",
"key2", "value2");
ObjectMetadata metadata = new ObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
metadata.setCacheControl(cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
metadata.setContentDisposition(contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
metadata.setContentEncoding(contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
metadata.setContentLanguage(contentLanguage);
}
metadata.setContentType(contentType);
// TODO: expires
metadata.setUserMetadata(userMetadata);
InitiateMultipartUploadResult result = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(containerName, blobName,
metadata));
ByteSource byteSource = TestUtils.randomByteSource().slice(
0, MINIMUM_MULTIPART_SIZE + 1);
ByteSource byteSource1 = byteSource.slice(0, MINIMUM_MULTIPART_SIZE);
ByteSource byteSource2 = byteSource.slice(MINIMUM_MULTIPART_SIZE, 1);
UploadPartResult part1 = client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(1)
.withPartSize(byteSource1.size())
.withInputStream(byteSource1.openStream()));
UploadPartResult part2 = client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(2)
.withPartSize(byteSource2.size())
.withInputStream(byteSource2.openStream()));
client.completeMultipartUpload(new CompleteMultipartUploadRequest(
containerName, blobName, result.getUploadId(),
ImmutableList.of(part1.getPartETag(), part2.getPartETag())));
ObjectListing listing = client.listObjects(containerName);
assertThat(listing.getObjectSummaries()).hasSize(1);
S3Object object = client.getObject(containerName, blobName);
try (InputStream actual = object.getObjectContent();
InputStream expected = byteSource.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
ObjectMetadata newContentMetadata = object.getObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
assertThat(newContentMetadata.getCacheControl()).isEqualTo(
cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentDisposition()).isEqualTo(
contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentEncoding()).isEqualTo(
contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentLanguage()).isEqualTo(
contentLanguage);
}
assertThat(newContentMetadata.getContentType()).isEqualTo(
contentType);
// TODO: expires
assertThat(newContentMetadata.getUserMetadata()).isEqualTo(
userMetadata);
}
@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(length));
return storeAndGetUploadPartResult(key, partNumber, content);
}
@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
key, uploadId, partNumber, MathUtils.checkedDownCast(length), null, inputFile, 0L);
return s3accessHelper.uploadPart(uploadRequest);
}