下面列出了怎么用com.amazonaws.services.s3.model.CompleteMultipartUploadRequest的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void complete() {
InitiateMultipartUploadResult response = mock(InitiateMultipartUploadResult.class);
when(response.getUploadId()).thenReturn(UPLOAD_ID);
when(s3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(response);
ArgumentCaptor<CompleteMultipartUploadRequest> request = ArgumentCaptor
.forClass(CompleteMultipartUploadRequest.class);
@SuppressWarnings("unchecked")
List<PartETag> partETags = mock(List.class);
when(s3.completeMultipartUpload(request.capture())).thenReturn(null);
underTest.start();
underTest.complete(UPLOAD_ID, partETags);
assertThat(request.getValue().getBucketName(), is(BUCKET));
assertThat(request.getValue().getKey(), is(KEY));
assertThat(request.getValue().getUploadId(), is(UPLOAD_ID));
assertThat(request.getValue().getPartETags(), is(partETags));
}
@Override
public void close() throws IOException {
open = false;
if (uploadBuffer.remaining() > 0) {
flush();
}
CompleteMultipartUploadRequest request =
new CompleteMultipartUploadRequest()
.withBucketName(path.getBucket())
.withKey(path.getKey())
.withUploadId(uploadId)
.withPartETags(eTags);
try {
amazonS3.completeMultipartUpload(request);
} catch (AmazonClientException e) {
throw new IOException(e);
}
}
@Override
public void commitMultipart(String s3Key, String uploadId, List<PartETag> partETags)
throws ModelDBException {
// Validate bucket
Boolean exist = doesBucketExist(bucketName);
if (!exist) {
throw new ModelDBException("Bucket does not exists", io.grpc.Status.Code.INTERNAL);
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, s3Key, uploadId, partETags);
try {
CompleteMultipartUploadResult result =
s3Client.completeMultipartUpload(completeMultipartUploadRequest);
LOGGER.info("upload result: {}", result);
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_BAD_REQUEST) {
LOGGER.info("message: {} additional details: {}", e.getMessage(), e.getAdditionalDetails());
throw new ModelDBException(e.getErrorMessage(), io.grpc.Status.Code.FAILED_PRECONDITION);
}
throw e;
}
}
void complete(String uploadId, List<PartETag> partETags) {
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
s3.completeMultipartUpload(request);
if (stopwatch.isRunning()) {
stopwatch.stop();
}
long seconds = stopwatch.elapsed(SECONDS);
log.info("Successfully uploaded {} bytes in {} seconds ({} bps) to s3://{}/{}", bytes, seconds,
(float) bytes / seconds, bucket, key);
}
public void complete(List<PartETag> partETags) {
if (LOG.isDebugEnabled()) {
LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
uploadId);
}
final CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
client.completeMultipartUpload(completeRequest);
}
public CompleteMultipartUploadRequest newCompleteRequest() {
List<PartETag> etags = Lists.newArrayList();
for (Map.Entry<Integer, String> entry : parts.entrySet()) {
etags.add(new PartETag(entry.getKey(), entry.getValue()));
}
return new CompleteMultipartUploadRequest(
bucket, key, uploadId, etags);
}
private static Set<String> getCommittedIds(
List<CompleteMultipartUploadRequest> commits) {
Set<String> committedUploads = Sets.newHashSet();
for (CompleteMultipartUploadRequest commit : commits) {
committedUploads.add(commit.getUploadId());
}
return committedUploads;
}
public void complete(List<PartETag> partETags) {
if (LOG.isDebugEnabled()) {
LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
uploadId);
}
final CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
client.completeMultipartUpload(completeRequest);
}
/**
* Complete a multipart upload operation.
* @param uploadId multipart operation Id
* @param partETags list of partial uploads
* @return the result
* @throws AmazonClientException on problems
*/
CompleteMultipartUploadResult completeMultipartUpload(String uploadId,
List<PartETag> partETags) throws AmazonClientException {
LOG.debug("Completing multipart upload {} with {} parts",
uploadId, partETags.size());
return mClient.completeMultipartUpload(
new CompleteMultipartUploadRequest(mBucket,
key,
uploadId,
partETags));
}
protected void parallelRequests(final AmazonS3 s3,
final String bucket,
final String key,
final Supplier<IOFunction<String, List<PartETag>>> operations)
{
InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key);
String uploadId = s3.initiateMultipartUpload(initiateRequest).getUploadId();
CompletionService<List<PartETag>> completionService = new ExecutorCompletionService<>(executorService);
try {
for (int i = 0; i < parallelism; i++) {
completionService.submit(() -> operations.get().apply(uploadId));
}
List<PartETag> partETags = new ArrayList<>();
for (int i = 0; i < parallelism; i++) {
partETags.addAll(completionService.take().get());
}
s3.completeMultipartUpload(new CompleteMultipartUploadRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartETags(partETags));
}
catch (InterruptedException interrupted) {
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId));
Thread.currentThread().interrupt();
}
catch (CancellationException | ExecutionException ex) {
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId));
throw new BlobStoreException(
format("Error executing parallel requests for bucket:%s key:%s with uploadId:%s", bucket, key, uploadId), ex,
null);
}
}
public static void uploadFile(
final File file,
final Artifact artifact,
final CompressionType compressionType,
final EncryptionKey encryptionKey,
final AmazonS3 amazonS3,
final BuildListener listener) throws IOException {
LoggingHelper.log(listener, "Uploading artifact: " + artifact + ", file: " + file);
final String bucketName = artifact.getLocation().getS3Location().getBucketName();
final String objectKey = artifact.getLocation().getS3Location().getObjectKey();
final List<PartETag> partETags = new ArrayList<>();
final InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
bucketName,
objectKey,
createObjectMetadata(compressionType))
.withSSEAwsKeyManagementParams(toSSEAwsKeyManagementParams(encryptionKey));
final InitiateMultipartUploadResult initiateMultipartUploadResult
= amazonS3.initiateMultipartUpload(initiateMultipartUploadRequest);
final long contentLength = file.length();
long filePosition = 0;
long partSize = 5 * 1024 * 1024; // Set part size to 5 MB
for (int i = 1; filePosition < contentLength; i++) {
partSize = Math.min(partSize, (contentLength - filePosition));
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(objectKey)
.withUploadId(initiateMultipartUploadResult.getUploadId())
.withPartNumber(i)
.withFileOffset(filePosition)
.withFile(file)
.withPartSize(partSize);
partETags.add(amazonS3.uploadPart(uploadPartRequest).getPartETag());
filePosition += partSize;
}
final CompleteMultipartUploadRequest completeMultipartUpload
= new CompleteMultipartUploadRequest(
bucketName,
objectKey,
initiateMultipartUploadResult.getUploadId(),
partETags);
amazonS3.completeMultipartUpload(completeMultipartUpload);
LoggingHelper.log(listener, "Upload successful");
}
@Test
public void uploadFileSuccess() throws IOException {
TestUtils.initializeTestingFolders();
final File compressedFile = CompressionTools.compressFile(
"ZipProject",
PATH_TO_COMPRESS,
CompressionType.Zip,
null);
PublisherTools.uploadFile(
compressedFile,
mockArtifact,
CompressionType.Zip,
null, // No custom encryption key
mockS3Client,
null); // Listener
final InOrder inOrder = inOrder(mockS3Client);
inOrder.verify(mockS3Client, times(1)).initiateMultipartUpload(initiateCaptor.capture());
// Total size is less than 5MB, should only be one upload
inOrder.verify(mockS3Client, times(1)).uploadPart(any(UploadPartRequest.class));
inOrder.verify(mockS3Client, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
assertContainsIgnoreCase("[AWS CodePipeline Plugin] Uploading artifact:", outContent.toString());
assertContainsIgnoreCase("[AWS CodePipeline Plugin] Upload successful", outContent.toString());
final InitiateMultipartUploadRequest request = initiateCaptor.getValue();
final SSEAwsKeyManagementParams encryptionParams = request.getSSEAwsKeyManagementParams();
assertNotNull(encryptionParams);
assertNull(encryptionParams.getAwsKmsKeyId());
assertEquals("aws:kms", encryptionParams.getEncryption());
compressedFile.delete();
TestUtils.cleanUpTestingFolders();
}
/**
* Send the CompleteMultipartUploadRequest to S3 if all the blocks of a file are uploaded into S3.
* @param keyName file to upload into S3
*/
private void verifyAndEmitFileMerge(String keyName)
{
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
return;
}
S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata = fileMetadatas.get(keyName);
List<PartETag> partETags = uploadParts.get(keyName);
if (partETags == null || uploadFileMetadata == null ||
uploadFileMetadata.getFileMetadata().getNumberOfBlocks() != partETags.size()) {
return;
}
if (partETags.size() <= 1) {
uploadedFiles.add(keyName);
LOG.debug("Uploaded file {} successfully", keyName);
return;
}
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName,
keyName, uploadFileMetadata.getUploadId(), partETags);
CompleteMultipartUploadResult result = s3Client.completeMultipartUpload(compRequest);
if (result.getETag() != null) {
uploadedFiles.add(keyName);
LOG.debug("Uploaded file {} successfully", keyName);
}
}
@Override
public void close() throws IOException {
UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(key)
.withPartNumber(partNumber)
.withPartSize(outputStream.size())
.withUploadId(multipartUpload.getUploadId())
.withInputStream(new ByteArrayInputStream(outputStream.toByteArray()));
UploadPartResult uploadPartResult = client.uploadPart(uploadPartRequest);
List<PartETag> partETags;
try {
partETags = CompletableFutures.allAsList(pendingUploads).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
partETags.add(uploadPartResult.getPartETag());
client.completeMultipartUpload(
new CompleteMultipartUploadRequest(
bucketName,
key,
multipartUpload.getUploadId(),
partETags)
);
super.close();
}
private void createKey(long counter) throws Exception {
timer.time(() -> {
if (multiPart) {
final String keyName = generateObjectName(counter);
final InitiateMultipartUploadRequest initiateRequest =
new InitiateMultipartUploadRequest(bucketName, keyName);
final InitiateMultipartUploadResult initiateMultipartUploadResult =
s3.initiateMultipartUpload(initiateRequest);
final String uploadId = initiateMultipartUploadResult.getUploadId();
List<PartETag> parts = new ArrayList<>();
for (int i = 1; i <= numberOfParts; i++) {
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(keyName)
.withPartNumber(i)
.withLastPart(i == numberOfParts)
.withUploadId(uploadId)
.withPartSize(fileSize)
.withInputStream(new ByteArrayInputStream(content.getBytes(
StandardCharsets.UTF_8)));
final UploadPartResult uploadPartResult =
s3.uploadPart(uploadPartRequest);
parts.add(uploadPartResult.getPartETag());
}
s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucketName, keyName, uploadId,
parts));
} else {
s3.putObject(bucketName, generateObjectName(counter),
content);
}
return null;
});
}
@Test
public void testMRJob() throws Exception {
FileSystem mockS3 = mock(FileSystem.class);
FileSystem s3 = S3_OUTPUT_PATH.getFileSystem(getConfiguration());
if (s3 instanceof MockS3FileSystem) {
((MockS3FileSystem) s3).setMock(mockS3);
} else {
throw new RuntimeException("Cannot continue: S3 not mocked");
}
String commitUUID = UUID.randomUUID().toString();
int numFiles = 3;
Set<String> expectedFiles = Sets.newHashSet();
for (int i = 0; i < numFiles; i += 1) {
File file = temp.newFile(String.valueOf(i) + ".text");
try (FileOutputStream out = new FileOutputStream(file)) {
out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
}
expectedFiles.add(new Path(
S3_OUTPUT_PATH, "part-m-0000" + i + "-" + commitUUID).toString());
}
Job mrJob = Job.getInstance(MR_CLUSTER.getConfig(), "test-committer-job");
Configuration conf = mrJob.getConfiguration();
mrJob.setOutputFormatClass(S3TextOutputFormat.class);
S3TextOutputFormat.setOutputPath(mrJob, S3_OUTPUT_PATH);
File mockResultsFile = temp.newFile("committer.bin");
mockResultsFile.delete();
String committerPath = "file:" + mockResultsFile;
conf.set("mock-results-file", committerPath);
conf.set(UPLOAD_UUID, commitUUID);
mrJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(mrJob,
new Path("file:" + temp.getRoot().toString()));
mrJob.setMapperClass(M.class);
mrJob.setNumReduceTasks(0);
mrJob.submit();
Assert.assertTrue("MR job should succeed", mrJob.waitForCompletion(true));
TestUtil.ClientResults results;
try (ObjectInputStream in = new ObjectInputStream(
FileSystem.getLocal(conf).open(new Path(committerPath)))) {
results = (TestUtil.ClientResults) in.readObject();
}
Assert.assertEquals("Should not delete files",
0, results.deletes.size());
Assert.assertEquals("Should not abort commits",
0, results.aborts.size());
Assert.assertEquals("Should commit task output files",
numFiles, results.commits.size());
Set<String> actualFiles = Sets.newHashSet();
for (CompleteMultipartUploadRequest commit : results.commits) {
actualFiles.add("s3://" + commit.getBucketName() + "/" + commit.getKey());
}
Assert.assertEquals("Should commit the correct file paths",
expectedFiles, actualFiles);
}
@Test
public void testJobCommitFailure() throws Exception {
Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
FileSystem fs = jobAttemptPath.getFileSystem(conf);
Set<String> uploads = runTasks(job, 4, 3);
Assert.assertTrue(fs.exists(jobAttemptPath));
jobCommitter.errors.failOnCommit(5);
TestUtil.assertThrows("Should propagate the commit failure",
AmazonClientException.class, "Fail on commit 5", new Callable<Void>() {
@Override
public Void call() throws IOException {
jobCommitter.commitJob(job);
return null;
}
});
Assert.assertEquals("Should have succeeded to commit some uploads",
5, jobCommitter.results.getCommits().size());
Assert.assertEquals("Should have deleted the files that succeeded",
5, jobCommitter.results.getDeletes().size());
Set<String> commits = Sets.newHashSet();
for (CompleteMultipartUploadRequest commit : jobCommitter.results.getCommits()) {
commits.add(commit.getBucketName() + commit.getKey());
}
Set<String> deletes = Sets.newHashSet();
for (DeleteObjectRequest delete : jobCommitter.results.getDeletes()) {
deletes.add(delete.getBucketName() + delete.getKey());
}
Assert.assertEquals("Committed and deleted objects should match",
commits, deletes);
Assert.assertEquals("Should have aborted the remaining uploads",
7, jobCommitter.results.getAborts().size());
Set<String> uploadIds = getCommittedIds(jobCommitter.results.getCommits());
uploadIds.addAll(getAbortedIds(jobCommitter.results.getAborts()));
Assert.assertEquals("Should have committed/deleted or aborted all uploads",
uploads, uploadIds);
Assert.assertFalse(fs.exists(jobAttemptPath));
}
public List<CompleteMultipartUploadRequest> getCommits() {
return commits;
}
private static CompleteMultipartUploadResult newResult(
CompleteMultipartUploadRequest req) {
return new CompleteMultipartUploadResult();
}
public void complete() {
log.debug("Completing multi-part upload for key '{}', id '{}'", key, uploadId);
CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
s3.completeMultipartUpload(completeRequest);
}
public CompleteMultipartUploadRequest getCompleteMultipartUploadRequest() {
return new CompleteMultipartUploadRequest(this.bucketName, this.key, this.uploadId,
this.partETags);
}
private void testMultipartCopy(S3Options options) {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(options);
S3ResourceId sourcePath = S3ResourceId.fromUri("s3://bucket/from");
S3ResourceId destinationPath = S3ResourceId.fromUri("s3://bucket/to");
InitiateMultipartUploadResult initiateMultipartUploadResult =
new InitiateMultipartUploadResult();
initiateMultipartUploadResult.setUploadId("upload-id");
if (getSSECustomerKeyMd5(options) != null) {
initiateMultipartUploadResult.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
}
when(s3FileSystem
.getAmazonS3Client()
.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)))
.thenReturn(initiateMultipartUploadResult);
assertEquals(
getSSECustomerKeyMd5(options),
s3FileSystem
.getAmazonS3Client()
.initiateMultipartUpload(
new InitiateMultipartUploadRequest(
destinationPath.getBucket(), destinationPath.getKey()))
.getSSECustomerKeyMd5());
ObjectMetadata sourceObjectMetadata = new ObjectMetadata();
sourceObjectMetadata.setContentLength((long) (options.getS3UploadBufferSizeBytes() * 1.5));
sourceObjectMetadata.setContentEncoding("read-seek-efficient");
if (getSSECustomerKeyMd5(options) != null) {
sourceObjectMetadata.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
}
assertGetObjectMetadata(
s3FileSystem,
createObjectMetadataRequest(sourcePath, options),
options,
sourceObjectMetadata);
CopyPartResult copyPartResult1 = new CopyPartResult();
copyPartResult1.setETag("etag-1");
CopyPartResult copyPartResult2 = new CopyPartResult();
copyPartResult1.setETag("etag-2");
if (getSSECustomerKeyMd5(options) != null) {
copyPartResult1.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
copyPartResult2.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
}
CopyPartRequest copyPartRequest = new CopyPartRequest();
copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
when(s3FileSystem.getAmazonS3Client().copyPart(any(CopyPartRequest.class)))
.thenReturn(copyPartResult1)
.thenReturn(copyPartResult2);
assertEquals(
getSSECustomerKeyMd5(options),
s3FileSystem.getAmazonS3Client().copyPart(copyPartRequest).getSSECustomerKeyMd5());
s3FileSystem.multipartCopy(sourcePath, destinationPath, sourceObjectMetadata);
verify(s3FileSystem.getAmazonS3Client(), times(1))
.completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
}
private void uploadMultiPart(final AmazonS3 s3,
final String bucket,
final String key,
final InputStream firstChunk,
final InputStream restOfContents)
throws IOException {
checkState(firstChunk.available() > 0);
String uploadId = null;
try {
InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key);
uploadId = s3.initiateMultipartUpload(initiateRequest).getUploadId();
log.debug("Starting multipart upload {} to key {} in bucket {}", uploadId, key, bucket);
List<UploadPartResult> results = new ArrayList<>();
for (int partNumber = 1; ; partNumber++) {
InputStream chunk = partNumber == 1 ? firstChunk : readChunk(restOfContents);
if (chunk.available() == 0) {
break;
}
else {
log.debug("Uploading chunk {} for {} of {} bytes", partNumber, uploadId, chunk.available());
UploadPartRequest part = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withInputStream(chunk)
.withPartSize(chunk.available());
results.add(s3.uploadPart(part));
}
}
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartETags(results);
s3.completeMultipartUpload(compRequest);
log.debug("Upload {} complete", uploadId);
uploadId = null;
}
finally {
if (uploadId != null) {
try {
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId));
}
catch(Exception e) {
log.error("Error aborting S3 multipart upload to bucket {} with key {}", bucket, key,
log.isDebugEnabled() ? e : null);
}
}
}
}
private void copyMultiPart(final AmazonS3 s3,
final String bucket,
final String sourcePath,
final String destinationPath,
final long length) {
checkState(length > 0);
String uploadId = null;
try {
long remaining = length;
long offset = 0;
InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, destinationPath);
uploadId = s3.initiateMultipartUpload(initiateRequest).getUploadId();
log.debug("Starting multipart copy {} to key {} from key {}", uploadId, destinationPath, sourcePath);
List<CopyPartResult> results = new ArrayList<>();
for (int partNumber = 1; ; partNumber++) {
if (remaining <= 0) {
break;
}
else {
long partSize = min(remaining, chunkSize);
log.trace("Copying chunk {} for {} from byte {} to {}, size {}", partNumber, uploadId, offset,
offset + partSize - 1, partSize);
CopyPartRequest part = new CopyPartRequest()
.withSourceBucketName(bucket)
.withSourceKey(sourcePath)
.withDestinationBucketName(bucket)
.withDestinationKey(destinationPath)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withFirstByte(offset)
.withLastByte(offset + partSize - 1);
results.add(s3.copyPart(part));
offset += partSize;
remaining -= partSize;
}
}
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest()
.withBucketName(bucket)
.withKey(destinationPath)
.withUploadId(uploadId)
.withPartETags(results.stream().map(r -> new PartETag(r.getPartNumber(), r.getETag())).collect(toList()));
s3.completeMultipartUpload(compRequest);
log.debug("Copy {} complete", uploadId);
}
catch(SdkClientException e) {
if (uploadId != null) {
try {
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, destinationPath, uploadId));
}
catch(Exception inner) {
log.error("Error aborting S3 multipart copy to bucket {} with key {}", bucket, destinationPath,
log.isDebugEnabled() ? inner : null);
}
}
throw e;
}
}
@Test
public void testS3OutputModule() throws Exception
{
InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
result.setUploadId(uploadId);
PutObjectResult objResult = new PutObjectResult();
objResult.setETag("SuccessFullyUploaded");
UploadPartResult partResult = new UploadPartResult();
partResult.setPartNumber(1);
partResult.setETag("SuccessFullyPartUploaded");
MockitoAnnotations.initMocks(this);
when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result);
when(client.putObject(any(PutObjectRequest.class))).thenReturn(objResult);
when(client.uploadPart(any(UploadPartRequest.class))).thenReturn(partResult);
when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(completeMultiPart());
Application app = new S3OutputModuleMockTest.Application();
Configuration conf = new Configuration();
conf.set("dt.operator.HDFSInputModule.prop.files", inputDir);
conf.set("dt.operator.HDFSInputModule.prop.blockSize", "10");
conf.set("dt.operator.HDFSInputModule.prop.blocksThreshold", "1");
conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","20");
conf.set("dt.operator.S3OutputModule.prop.accessKey", "accessKey");
conf.set("dt.operator.S3OutputModule.prop.secretAccessKey", "secretKey");
conf.set("dt.operator.S3OutputModule.prop.bucketName", "bucketKey");
conf.set("dt.operator.S3OutputModule.prop.outputDirectoryPath", outputDir);
Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
final Path outputFilePath = new Path(outDir.toString() + File.separator + FILE);
final FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(true);
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return fs.exists(outputFilePath);
}
});
lc.run(10000);
Assert.assertTrue("output file exist", fs.exists(outputFilePath));
}
/** Unsupported Operation. */
@Override public CompleteMultipartUploadResult completeMultipartUpload(
CompleteMultipartUploadRequest req) throws SdkClientException {
throw new UnsupportedOperationException("Operation not supported");
}
@Test
public void testMultipartCopy() throws Exception {
// B2 requires two parts to issue an MPU
assumeTrue(!blobStoreType.equals("b2"));
String sourceBlobName = "testMultipartCopy-source";
String targetBlobName = "testMultipartCopy-target";
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(BYTE_SOURCE.size());
client.putObject(containerName, sourceBlobName,
BYTE_SOURCE.openStream(), metadata);
InitiateMultipartUploadRequest initiateRequest =
new InitiateMultipartUploadRequest(containerName,
targetBlobName);
InitiateMultipartUploadResult initResult =
client.initiateMultipartUpload(initiateRequest);
String uploadId = initResult.getUploadId();
CopyPartRequest copyRequest = new CopyPartRequest()
.withDestinationBucketName(containerName)
.withDestinationKey(targetBlobName)
.withSourceBucketName(containerName)
.withSourceKey(sourceBlobName)
.withUploadId(uploadId)
.withFirstByte(0L)
.withLastByte(BYTE_SOURCE.size() - 1)
.withPartNumber(1);
CopyPartResult copyPartResult = client.copyPart(copyRequest);
CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(
containerName, targetBlobName, uploadId,
ImmutableList.of(copyPartResult.getPartETag()));
client.completeMultipartUpload(completeRequest);
S3Object object = client.getObject(containerName, targetBlobName);
assertThat(object.getObjectMetadata().getContentLength()).isEqualTo(
BYTE_SOURCE.size());
try (InputStream actual = object.getObjectContent();
InputStream expected = BYTE_SOURCE.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
}
@Test
public void testBigMultipartUpload() throws Exception {
String key = "multipart-upload";
long partSize = MINIMUM_MULTIPART_SIZE;
long size = partSize + 1;
ByteSource byteSource = TestUtils.randomByteSource().slice(0, size);
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(containerName, key);
InitiateMultipartUploadResult initResponse =
client.initiateMultipartUpload(initRequest);
String uploadId = initResponse.getUploadId();
ByteSource byteSource1 = byteSource.slice(0, partSize);
UploadPartRequest uploadRequest1 = new UploadPartRequest()
.withBucketName(containerName)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(1)
.withInputStream(byteSource1.openStream())
.withPartSize(byteSource1.size());
uploadRequest1.getRequestClientOptions().setReadLimit(
(int) byteSource1.size());
UploadPartResult uploadPartResult1 = client.uploadPart(uploadRequest1);
ByteSource byteSource2 = byteSource.slice(partSize, size - partSize);
UploadPartRequest uploadRequest2 = new UploadPartRequest()
.withBucketName(containerName)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(2)
.withInputStream(byteSource2.openStream())
.withPartSize(byteSource2.size());
uploadRequest2.getRequestClientOptions().setReadLimit(
(int) byteSource2.size());
UploadPartResult uploadPartResult2 = client.uploadPart(uploadRequest2);
CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(
containerName, key, uploadId,
ImmutableList.of(
uploadPartResult1.getPartETag(),
uploadPartResult2.getPartETag()));
client.completeMultipartUpload(completeRequest);
S3Object object = client.getObject(containerName, key);
assertThat(object.getObjectMetadata().getContentLength()).isEqualTo(
size);
try (InputStream actual = object.getObjectContent();
InputStream expected = byteSource.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
}
@Test
public void testMultipartUpload() throws Exception {
String blobName = "multipart-upload";
String cacheControl = "max-age=3600";
String contentDisposition = "attachment; filename=new.jpg";
String contentEncoding = "gzip";
String contentLanguage = "fr";
String contentType = "audio/mp4";
Map<String, String> userMetadata = ImmutableMap.of(
"key1", "value1",
"key2", "value2");
ObjectMetadata metadata = new ObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
metadata.setCacheControl(cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
metadata.setContentDisposition(contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
metadata.setContentEncoding(contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
metadata.setContentLanguage(contentLanguage);
}
metadata.setContentType(contentType);
// TODO: expires
metadata.setUserMetadata(userMetadata);
InitiateMultipartUploadResult result = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(containerName, blobName,
metadata));
ByteSource byteSource = TestUtils.randomByteSource().slice(
0, MINIMUM_MULTIPART_SIZE + 1);
ByteSource byteSource1 = byteSource.slice(0, MINIMUM_MULTIPART_SIZE);
ByteSource byteSource2 = byteSource.slice(MINIMUM_MULTIPART_SIZE, 1);
UploadPartResult part1 = client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(1)
.withPartSize(byteSource1.size())
.withInputStream(byteSource1.openStream()));
UploadPartResult part2 = client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(2)
.withPartSize(byteSource2.size())
.withInputStream(byteSource2.openStream()));
client.completeMultipartUpload(new CompleteMultipartUploadRequest(
containerName, blobName, result.getUploadId(),
ImmutableList.of(part1.getPartETag(), part2.getPartETag())));
ObjectListing listing = client.listObjects(containerName);
assertThat(listing.getObjectSummaries()).hasSize(1);
S3Object object = client.getObject(containerName, blobName);
try (InputStream actual = object.getObjectContent();
InputStream expected = byteSource.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
ObjectMetadata newContentMetadata = object.getObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
assertThat(newContentMetadata.getCacheControl()).isEqualTo(
cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentDisposition()).isEqualTo(
contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentEncoding()).isEqualTo(
contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentLanguage()).isEqualTo(
contentLanguage);
}
assertThat(newContentMetadata.getContentType()).isEqualTo(
contentType);
// TODO: expires
assertThat(newContentMetadata.getUserMetadata()).isEqualTo(
userMetadata);
}
private void multipartUpload(
String key,
File file,
ObjectMetadata objectMetadata,
Optional<StorageClass> maybeStorageClass
)
throws Exception {
List<PartETag> partETags = new ArrayList<>();
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(
bucketName,
key,
objectMetadata
);
if (maybeStorageClass.isPresent()) {
initRequest.setStorageClass(maybeStorageClass.get());
}
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(
initRequest
);
long contentLength = file.length();
long partSize = configuration.getUploadPartSize();
try {
long filePosition = 0;
for (int i = 1; filePosition < contentLength; i++) {
partSize = Math.min(partSize, (contentLength - filePosition));
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(key)
.withUploadId(initResponse.getUploadId())
.withPartNumber(i)
.withFileOffset(filePosition)
.withFile(file)
.withPartSize(partSize);
partETags.add(s3Client.uploadPart(uploadRequest).getPartETag());
filePosition += partSize;
}
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
bucketName,
key,
initResponse.getUploadId(),
partETags
);
s3Client.completeMultipartUpload(completeRequest);
} catch (Exception e) {
s3Client.abortMultipartUpload(
new AbortMultipartUploadRequest(bucketName, key, initResponse.getUploadId())
);
throw new RuntimeException(e);
}
}