下面列出了com.amazonaws.services.s3.model.RestoreObjectRequest#com.amazonaws.services.s3.model.StorageClass 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public FreshenResult freshenRemoteObject(final RemoteObjectReference object) throws InterruptedException {
final String canonicalPath = ((S3RemoteObjectReference) object).canonicalPath;
final CopyObjectRequest copyRequest = new CopyObjectRequest(request.storageLocation.bucket,
canonicalPath,
request.storageLocation.bucket,
canonicalPath).withStorageClass(StorageClass.Standard);
try {
// attempt to refresh existing object in the bucket via an inplace copy
transferManager.copy(copyRequest).waitForCompletion();
return FreshenResult.FRESHENED;
} catch (final AmazonServiceException e) {
// AWS S3 under certain access policies can't return NoSuchKey (404)
// instead, it returns AccessDenied (403) — handle it the same way
if (e.getStatusCode() != 404 && e.getStatusCode() != 403) {
throw e;
}
// the freshen failed because the file/key didn't exist
return FreshenResult.UPLOAD_REQUIRED;
}
}
public MultipartState(String buf) {
String[] fields = buf.split(SEPARATOR);
_uploadId = fields[0];
_filePosition = Long.parseLong(fields[1]);
_partETags = new ArrayList<>();
for (String part : fields[2].split(",")) {
if (part != null && !part.isEmpty()) {
String[] partFields = part.split("/");
_partETags.add(new PartETag(Integer.parseInt(partFields[0]), partFields[1]));
}
}
_partSize = Long.parseLong(fields[3]);
_storageClass = StorageClass.fromValue(fields[4]);
_contentLength = Long.parseLong(fields[5]);
_timestamp = Long.parseLong(fields[6]);
}
@Override
public void store(BuildCacheKey key, BuildCacheEntryWriter writer) {
final String bucketPath = getBucketPath(key);
logger.info("Start storing cache entry '{}' in S3 bucket", bucketPath);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentType(BUILD_CACHE_CONTENT_TYPE);
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
writer.writeTo(os);
meta.setContentLength(os.size());
try (InputStream is = new ByteArrayInputStream(os.toByteArray())) {
PutObjectRequest request = getPutObjectRequest(bucketPath, meta, is);
if(this.reducedRedundancy) {
request.withStorageClass(StorageClass.ReducedRedundancy);
}
s3.putObject(request);
}
} catch (IOException e) {
throw new BuildCacheException("Error while storing cache object in S3 bucket", e);
}
}
@Before
public void init() {
copierOptions.put(CREDENTIAL_PROVIDER, URI.create("localjceks://file/foo/bar.jceks"));
copierOptions.put(MULTIPART_UPLOAD_CHUNK_SIZE, 4096);
copierOptions.put(S3_SERVER_SIDE_ENCRYPTION, true);
copierOptions.put(STORAGE_CLASS, StorageClass.Glacier.toString());
copierOptions.put(TASK_BANDWIDTH, 1024);
copierOptions.put(NUMBER_OF_WORKERS_PER_MAP, 12);
copierOptions.put(MULTIPART_UPLOAD_THRESHOLD, 2048L);
copierOptions.put(MAX_MAPS, 5);
copierOptions.put(COPY_STRATEGY, "mycopystrategy");
copierOptions.put(LOG_PATH, new Path("hdfs:///tmp/logs"));
copierOptions.put(REGION, Regions.EU_WEST_1.getName());
copierOptions.put(IGNORE_FAILURES, false);
copierOptions.put(S3_ENDPOINT_URI, "http://s3.endpoint/");
copierOptions.put(UPLOAD_RETRY_COUNT, 5);
copierOptions.put(UPLOAD_RETRY_DELAY_MS, 520);
copierOptions.put(UPLOAD_BUFFER_SIZE, 1024);
copierOptions.put(CANNED_ACL, "bucket-owner-full-control");
parser = new S3MapReduceCpOptionsParser(SOURCES, TARGET, DEFAULT_CREDS_PROVIDER);
}
private void assertDefaults(S3MapReduceCpOptions options) {
assertThat(options.getCredentialsProvider(), is(URI.create("localjceks://file/foo/bar.jceks")));
assertThat(options.getMultipartUploadPartSize(), is(4096L));
assertThat(options.isS3ServerSideEncryption(), is(true));
assertThat(options.getStorageClass(), is(StorageClass.Glacier.toString()));
assertThat(options.getMaxBandwidth(), is(1024L));
assertThat(options.getNumberOfUploadWorkers(), is(12));
assertThat(options.getMultipartUploadThreshold(), is(2048L));
assertThat(options.getMaxMaps(), is(5));
assertThat(options.getCopyStrategy(), is("mycopystrategy"));
assertThat(options.getLogPath(), is(new Path("hdfs:///tmp/logs")));
assertThat(options.getRegion(), is(Regions.EU_WEST_1.getName()));
assertThat(options.isIgnoreFailures(), is(false));
assertThat(options.getS3EndpointUri(), is(URI.create("http://s3.endpoint/")));
assertThat(options.getUploadRetryCount(), is(5));
assertThat(options.getUploadRetryDelayMs(), is(520L));
assertThat(options.getUploadBufferSize(), is(1024));
assertThat(options.getCannedAcl(), is("bucket-owner-full-control"));
}
@Test
public void defaultValues() {
S3MapReduceCpOptions options = new S3MapReduceCpOptions();
assertThat(options.isHelp(), is(false));
assertThat(options.isBlocking(), is(true));
assertThat(options.getSources(), is(nullValue()));
assertThat(options.getTarget(), is(nullValue()));
assertThat(options.getCredentialsProvider(), is(nullValue()));
assertThat(options.getMultipartUploadPartSize(), is(5L * 1024 * 1024));
assertThat(options.isS3ServerSideEncryption(), is(false));
assertThat(options.getStorageClass(), is(StorageClass.Standard.toString()));
assertThat(options.getMaxBandwidth(), is(100L));
assertThat(options.getNumberOfUploadWorkers(), is(20));
assertThat(options.getMultipartUploadThreshold(), is(16L * 1024 * 1024));
assertThat(options.getMaxMaps(), is(20));
assertThat(options.getCopyStrategy(), is("uniformsize"));
assertThat(options.getLogPath(), is(nullValue()));
assertThat(options.getRegion(), is(nullValue()));
assertThat(options.isIgnoreFailures(), is(false));
assertThat(options.getS3EndpointUri(), is(nullValue()));
assertThat(options.getUploadRetryCount(), is(3));
assertThat(options.getUploadRetryDelayMs(), is(300L));
assertThat(options.getUploadBufferSize(), is(0));
assertThat(options.getCannedAcl(), is(nullValue()));
assertThat(options.getAssumeRole(), is(nullValue()));
}
private boolean uploadFile(File src, S3Details dstDetails, boolean archive) {
PutObjectRequest request = new PutObjectRequest(dstDetails.getBucket(),
FileUtil.appendPath(dstDetails.getKey(), src.getName()), src);
ObjectMetadata meta = new ObjectMetadata();
if (archive) {
meta
.setHeader(Headers.STORAGE_CLASS,
StorageClass.fromValue(StringUtils.upperCase(archiveStorageClass)));
}
if (encrypt) {
meta.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
request.setMetadata(meta);
try {
amazonS3.putObject(request);
return true;
} catch (Exception e) {
log.error("Error while uploading file: {}", src, e);
}
return false;
}
@Test
public void testRestoreObjects()
{
// Put a 1 byte Glacier storage class file in S3.
ObjectMetadata metadata = new ObjectMetadata();
metadata.setHeader(Headers.STORAGE_CLASS, StorageClass.Glacier);
metadata.setOngoingRestore(false);
s3Operations
.putObject(new PutObjectRequest(storageDaoTestHelper.getS3ManagedBucketName(), TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]), metadata),
null);
// Initiate a restore request for the test S3 file.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(storageDaoTestHelper.getS3ManagedBucketName());
params.setFiles(Arrays.asList(new File(TARGET_S3_KEY)));
s3Dao.restoreObjects(params, S3_RESTORE_OBJECT_EXPIRATION_IN_DAYS, ARCHIVE_RETRIEVAL_OPTION);
// Validate that there is an ongoing restore request for this object.
ObjectMetadata objectMetadata = s3Operations.getObjectMetadata(storageDaoTestHelper.getS3ManagedBucketName(), TARGET_S3_KEY, null);
assertTrue(objectMetadata.getOngoingRestore());
}
@Test
public void testRestoreObjectsGlacierObjectAlreadyBeingRestored()
{
// Put a 1 byte Glacier storage class file in S3 flagged as already being restored.
ObjectMetadata metadata = new ObjectMetadata();
metadata.setHeader(Headers.STORAGE_CLASS, StorageClass.Glacier);
metadata.setOngoingRestore(true);
s3Operations
.putObject(new PutObjectRequest(storageDaoTestHelper.getS3ManagedBucketName(), TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]), metadata),
null);
// Initiate a restore request for the test S3 file.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(storageDaoTestHelper.getS3ManagedBucketName());
params.setFiles(Arrays.asList(new File(TARGET_S3_KEY)));
s3Dao.restoreObjects(params, S3_RESTORE_OBJECT_EXPIRATION_IN_DAYS, ARCHIVE_RETRIEVAL_OPTION);
// Validate that there is still an ongoing restore request for this object.
ObjectMetadata objectMetadata = s3Operations.getObjectMetadata(storageDaoTestHelper.getS3ManagedBucketName(), TARGET_S3_KEY, null);
assertTrue(objectMetadata.getOngoingRestore());
}
@Test
public void testValidateGlacierS3FilesRestored()
{
// Put a 1 byte already restored Glacier storage class file in S3.
ObjectMetadata metadata = new ObjectMetadata();
metadata.setHeader(Headers.STORAGE_CLASS, StorageClass.Glacier);
metadata.setOngoingRestore(false);
s3Operations
.putObject(new PutObjectRequest(storageDaoTestHelper.getS3ManagedBucketName(), TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]), metadata),
null);
// Validate the file.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(storageDaoTestHelper.getS3ManagedBucketName());
params.setFiles(Arrays.asList(new File(TARGET_S3_KEY)));
s3Dao.validateGlacierS3FilesRestored(params);
}
@Test
public void testValidateGlacierS3FilesRestoredGlacierObjectRestoreNotInitiated()
{
// Put a 1 byte Glacier storage class file in S3 that has no restore initiated (OngoingRestore flag is null).
ObjectMetadata metadata = new ObjectMetadata();
metadata.setHeader(Headers.STORAGE_CLASS, StorageClass.Glacier);
s3Operations
.putObject(new PutObjectRequest(storageDaoTestHelper.getS3ManagedBucketName(), TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]), metadata),
null);
// Try to validate if the Glacier S3 file is already restored.
try
{
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(storageDaoTestHelper.getS3ManagedBucketName());
params.setFiles(Arrays.asList(new File(TARGET_S3_KEY)));
s3Dao.validateGlacierS3FilesRestored(params);
fail("Should throw an IllegalArgumentException when Glacier S3 file is not restored.");
}
catch (IllegalArgumentException e)
{
assertEquals(String
.format("Archived S3 file \"%s\" is not restored. StorageClass {GLACIER}, OngoingRestore flag {null}, S3 bucket name {%s}",
TARGET_S3_KEY, storageDaoTestHelper.getS3ManagedBucketName()), e.getMessage());
}
}
public MultipartState(String buf) {
String[] fields = buf.split(SEPARATOR);
_uploadId = fields[0];
_filePosition = Long.parseLong(fields[1]);
_partETags = new ArrayList<>();
for (String part : fields[2].split(",")) {
if (part != null && !part.isEmpty()) {
String[] partFields = part.split("/");
_partETags.add(new PartETag(Integer.parseInt(partFields[0]), partFields[1]));
}
}
_partSize = Long.parseLong(fields[3]);
_storageClass = StorageClass.fromValue(fields[4]);
_contentLength = Long.parseLong(fields[5]);
_timestamp = Long.parseLong(fields[6]);
}
@Test
public void testStorageClasses() {
for (StorageClass storageClass : StorageClass.values()) {
runner.setProperty(PutS3Object.STORAGE_CLASS, storageClass.name());
prepareTest();
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals(storageClass.toString(), request.getStorageClass());
Mockito.reset(mockS3Client);
}
}
@Test
public void testStorageClasses() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
for (StorageClass storageClass : StorageClass.values()) {
runner.setProperty(PutS3Object.STORAGE_CLASS, storageClass.name());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "testStorageClasses/small_" + storageClass.name() + ".txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
FlowFile file = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0);
Assert.assertEquals(storageClass.toString(), file.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
runner.clearTransferState();
}
}
private String initiateMultipartUpload() throws IOException {
boolean operationSuccessful = false;
final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.object);
if (this.useRRS) {
request.setStorageClass(StorageClass.ReducedRedundancy);
} else {
request.setStorageClass(StorageClass.Standard);
}
try {
final InitiateMultipartUploadResult result = this.s3Client.initiateMultipartUpload(request);
operationSuccessful = true;
return result.getUploadId();
} catch (AmazonServiceException e) {
throw new IOException(StringUtils.stringifyException(e));
} finally {
if (!operationSuccessful) {
abortUpload();
}
}
}
public static StorageClass initStorageClass(String storageClass) {
if ((storageClass == null) || storageClass.equals("")) {
return StorageClass.Standard;
}
try {
final StorageClass _storageClass = StorageClass.fromValue(storageClass.toUpperCase(Locale.ENGLISH));
if (_storageClass.equals(StorageClass.Glacier)) {
throw new BlobStoreException("Glacier storage class is not supported");
}
return _storageClass;
} catch (final IllegalArgumentException illegalArgumentException) {
throw new BlobStoreException("`" + storageClass + "` is not a valid S3 Storage Class.");
}
}
@Override
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Request)
{
final String continuationToken = "continue";
ListObjectsV2Result listingV2 = new ListObjectsV2Result();
if (continuationToken.equals(listObjectsV2Request.getContinuationToken())) {
S3ObjectSummary standardTwo = new S3ObjectSummary();
standardTwo.setStorageClass(StorageClass.Standard.toString());
standardTwo.setKey("test/standardTwo");
standardTwo.setLastModified(new Date());
listingV2.getObjectSummaries().add(standardTwo);
if (hasGlacierObjects) {
S3ObjectSummary glacier = new S3ObjectSummary();
glacier.setStorageClass(StorageClass.Glacier.toString());
glacier.setKey("test/glacier");
glacier.setLastModified(new Date());
listingV2.getObjectSummaries().add(glacier);
}
}
else {
S3ObjectSummary standardOne = new S3ObjectSummary();
standardOne.setStorageClass(StorageClass.Standard.toString());
standardOne.setKey("test/standardOne");
standardOne.setLastModified(new Date());
listingV2.getObjectSummaries().add(standardOne);
listingV2.setTruncated(true);
listingV2.setNextContinuationToken(continuationToken);
}
return listingV2;
}
public MultipartState() {
_uploadId = "";
_filePosition = 0L;
_partETags = new ArrayList<>();
_partSize = 0L;
_storageClass = StorageClass.Standard;
_contentLength = 0L;
_timestamp = System.currentTimeMillis();
}
@Test
public void testStorageClass() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
int bytesNeeded = 55 * 1024 * 1024;
StringBuilder bldr = new StringBuilder(bytesNeeded + 1000);
for (int line = 0; line < 55; line++) {
bldr.append(String.format("line %06d This is sixty-three characters plus the EOL marker!\n", line));
}
String data55mb = bldr.toString();
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/2.txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
attrs.put("filename", "folder/3.txt");
runner.enqueue(data55mb.getBytes(), attrs);
runner.run(2);
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 2);
FlowFile file1 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0);
Assert.assertEquals(StorageClass.ReducedRedundancy.toString(),
file1.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
FlowFile file2 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(1);
Assert.assertEquals(StorageClass.ReducedRedundancy.toString(),
file2.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
}
@Test
public void testStateDefaults() {
PutS3Object.MultipartState state1 = new PutS3Object.MultipartState();
Assert.assertEquals(state1.getUploadId(), "");
Assert.assertEquals(state1.getFilePosition(), (Long) 0L);
Assert.assertEquals(state1.getPartETags().size(), 0L);
Assert.assertEquals(state1.getPartSize(), (Long) 0L);
Assert.assertEquals(state1.getStorageClass().toString(), StorageClass.Standard.toString());
Assert.assertEquals(state1.getContentLength(), (Long) 0L);
}
@Test
public void testStateToString() throws IOException, InitializationException {
final String target = "UID-test1234567890#10001#1/PartETag-1,2/PartETag-2,3/PartETag-3,4/PartETag-4#20002#REDUCED_REDUNDANCY#30003#8675309";
PutS3Object.MultipartState state2 = new PutS3Object.MultipartState();
state2.setUploadId("UID-test1234567890");
state2.setFilePosition(10001L);
state2.setTimestamp(8675309L);
for (Integer partNum = 1; partNum < 5; partNum++) {
state2.addPartETag(new PartETag(partNum, "PartETag-" + partNum.toString()));
}
state2.setPartSize(20002L);
state2.setStorageClass(StorageClass.ReducedRedundancy);
state2.setContentLength(30003L);
Assert.assertEquals(target, state2.toString());
}
@Test
public void storePutsObjectAndUsesReducedRedundancyWhenConfigured() throws IOException {
/** Setup **/
buildCacheService = spy(new AwsS3BuildCacheService(s3, "bucketName", null, true));
doReturn(putObjectRequest).when(buildCacheService).getPutObjectRequest(any(String.class),
any(ObjectMetadata.class), any(InputStream.class));
/** Run **/
buildCacheService.store(key, writer);
/** Check **/
verifyThatStoreStores("abcdefghijkl123456789");
verify(putObjectRequest).withStorageClass(eq(StorageClass.ReducedRedundancy));
}
@Test
public void storePutsObjectAndDoesNotUseReducedRedundancyWhenConfigured() throws IOException {
/** Setup **/
buildCacheService = spy(new AwsS3BuildCacheService(s3, "bucketName", null, false));
doReturn(putObjectRequest).when(buildCacheService).getPutObjectRequest(any(String.class),
any(ObjectMetadata.class), any(InputStream.class));
/** Run **/
buildCacheService.store(key, writer);
/** Check **/
verifyThatStoreStores("abcdefghijkl123456789");
verify(putObjectRequest, never()).withStorageClass(eq(StorageClass.ReducedRedundancy));
}
@Test
public void storePutsObjectAndUsesPathWhenConfigured() throws IOException {
/** Setup **/
buildCacheService = spy(new AwsS3BuildCacheService(s3, "bucketName", "cache", false));
doReturn(putObjectRequest).when(buildCacheService).getPutObjectRequest(eq("cache/abcdefghijkl123456789"),
any(ObjectMetadata.class), any(InputStream.class));
/** Run **/
buildCacheService.store(key, writer);
/** Check **/
verifyThatStoreStores("cache/abcdefghijkl123456789");
verify(putObjectRequest, never()).withStorageClass(eq(StorageClass.ReducedRedundancy));
}
@Test
public void overwriteAllCopierOptions() throws Exception {
copierOptions.put(CREDENTIAL_PROVIDER, "jceks://hdfs/foo/bar.jceks");
copierOptions.put(MULTIPART_UPLOAD_CHUNK_SIZE, "1234");
copierOptions.put(S3_SERVER_SIDE_ENCRYPTION, "true");
copierOptions.put(STORAGE_CLASS, "reduced_redundancy");
copierOptions.put(TASK_BANDWIDTH, "567");
copierOptions.put(NUMBER_OF_WORKERS_PER_MAP, "89");
copierOptions.put(MULTIPART_UPLOAD_THRESHOLD, "123456");
copierOptions.put(MAX_MAPS, "78");
copierOptions.put(COPY_STRATEGY, "the-strategy");
copierOptions.put(LOG_PATH, "hdfs://path/to/logs/");
copierOptions.put(REGION, "us-east-1");
copierOptions.put(IGNORE_FAILURES, "true");
copierOptions.put(CANNED_ACL, CannedAccessControlList.BucketOwnerFullControl.toString());
S3MapReduceCpCopier copier = new S3MapReduceCpCopier(conf, sourceDataBaseLocation, Collections.<Path>emptyList(),
replicaDataLocation, copierOptions, executor, metricRegistry);
Metrics metrics = copier.copy();
assertThat(metrics, not(nullValue()));
verify(executor).exec(confCaptor.capture(), optionsCaptor.capture());
S3MapReduceCpOptions options = optionsCaptor.getValue();
assertThat(options.getSources(), is(Arrays.asList(sourceDataBaseLocation)));
assertThat(options.getTarget(), is(replicaDataLocation.toUri()));
assertThat(options.getCredentialsProvider(), is(URI.create("jceks://hdfs/foo/bar.jceks")));
assertThat(options.getMultipartUploadPartSize(), is(1234L));
assertThat(options.isS3ServerSideEncryption(), is(true));
assertThat(options.getStorageClass(), is(StorageClass.ReducedRedundancy.toString()));
assertThat(options.getMaxBandwidth(), is(567L));
assertThat(options.getNumberOfUploadWorkers(), is(89));
assertThat(options.getMultipartUploadThreshold(), is(123456L));
assertThat(options.getMaxMaps(), is(78));
assertThat(options.getCopyStrategy(), is("the-strategy"));
assertThat(options.getLogPath(), is(new Path("hdfs://path/to/logs/")));
assertThat(options.getRegion(), is(Regions.US_EAST_1.getName()));
assertThat(options.isIgnoreFailures(), is(true));
assertThat(options.getCannedAcl(), is(CannedAccessControlList.BucketOwnerFullControl.toString()));
}
@Test
public void builderWithStorageClass() {
S3MapReduceCpOptions options = S3MapReduceCpOptions
.builder(SOURCES, TARGET)
.storageClass(StorageClass.Glacier.toString())
.build();
assertThat(options.isHelp(), is(false));
assertThat(options.isBlocking(), is(true));
assertThat(options.getSources(), is(SOURCES));
assertThat(options.getTarget(), is(TARGET));
assertThat(options.getCredentialsProvider(), is(ConfigurationVariable.CREDENTIAL_PROVIDER.defaultURIValue()));
assertThat(options.getMultipartUploadPartSize(),
is(ConfigurationVariable.MINIMUM_UPLOAD_PART_SIZE.defaultLongValue()));
assertThat(options.isS3ServerSideEncryption(),
is(ConfigurationVariable.S3_SERVER_SIDE_ENCRYPTION.defaultBooleanValue()));
assertThat(options.getStorageClass(), is(StorageClass.Glacier.toString()));
assertThat(options.getMaxBandwidth(), is(ConfigurationVariable.MAX_BANDWIDTH.defaultLongValue()));
assertThat(options.getNumberOfUploadWorkers(),
is(ConfigurationVariable.NUMBER_OF_UPLOAD_WORKERS.defaultIntValue()));
assertThat(options.getMultipartUploadThreshold(),
is(ConfigurationVariable.MULTIPART_UPLOAD_THRESHOLD.defaultLongValue()));
assertThat(options.getMaxMaps(), is(ConfigurationVariable.MAX_MAPS.defaultIntValue()));
assertThat(options.getCopyStrategy(), is(ConfigurationVariable.COPY_STRATEGY.defaultValue()));
assertThat(options.getLogPath(), is(nullValue()));
assertThat(options.getRegion(), is(ConfigurationVariable.REGION.defaultValue()));
assertThat(options.isIgnoreFailures(), is(ConfigurationVariable.IGNORE_FAILURES.defaultBooleanValue()));
assertThat(options.getS3EndpointUri(), is(ConfigurationVariable.S3_ENDPOINT_URI.defaultURIValue()));
assertThat(options.getUploadRetryCount(), is(ConfigurationVariable.UPLOAD_RETRY_COUNT.defaultIntValue()));
assertThat(options.getUploadRetryDelayMs(), is(ConfigurationVariable.UPLOAD_RETRY_DELAY_MS.defaultLongValue()));
assertThat(options.getUploadBufferSize(), is(ConfigurationVariable.UPLOAD_BUFFER_SIZE.defaultIntValue()));
assertThat(options.getCannedAcl(), is(ConfigurationVariable.CANNED_ACL.defaultValue()));
assertThat(options.getAssumeRole(), is(ConfigurationVariable.ASSUME_ROLE.defaultValue()));
}
private boolean uploadDirectory(File location, S3Details dstDetails, boolean archive) {
ObjectMetadataProvider metaDataProvider = (file, meta) -> {
if (archive) {
meta
.setHeader(Headers.STORAGE_CLASS,
StorageClass.fromValue(StringUtils.upperCase(archiveStorageClass)));
}
if (encrypt) {
meta.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
};
MultipleFileUpload upload = transferManager
.uploadDirectory(dstDetails.getBucket(), dstDetails.getKey(), location, true,
metaDataProvider);
if (log.isDebugEnabled()) {
ProgressListener progressListener = progressEvent -> log
.debug("S3 Transferred bytes: " + progressEvent.getBytesTransferred());
upload.addProgressListener(progressListener);
}
try {
upload.waitForCompletion();
return true;
} catch (Exception e) {
log.error("Error while uploading directory: {}", location, e);
}
return false;
}
@Override
public boolean move(String src, String dst, boolean archive) {
log.info("Move: {} to {}", src, dst);
// s3 has no move operation, so do a copy and delete
// this is not an atomic operation
Optional<S3Details> srcDetails = S3Details.from(src);
Optional<S3Details> dstDetails = S3Details.from(dst);
if (srcDetails.isPresent() && dstDetails.isPresent()) {
CopyObjectRequest cor = new CopyObjectRequest(srcDetails.get().getBucket(),
srcDetails.get().getKey(), dstDetails.get().getBucket(), dstDetails.get().getKey());
// make sure to set the storage class for file copy
if (archive) {
// set class for archive file
cor.setStorageClass(StorageClass.fromValue(StringUtils.upperCase(archiveStorageClass)));
} else {
// set class for parquet files
cor.setStorageClass(StorageClass.fromValue(StringUtils.upperCase(uploadStorageClass)));
}
try {
CopyObjectResult r = amazonS3.copyObject(cor);
if (Objects.nonNull(r.getETag())) {
// copy ok, delete src
amazonS3.deleteObject(srcDetails.get().getBucket(), srcDetails.get().getKey());
}
return true;
} catch (Exception e) {
log.error("Error during copying {} to ", src, dst, e);
}
}
return false;
}
/**
* Executes S3 specific steps required to expire business object data.
*
* @param businessObjectDataRestoreDto the DTO that holds various parameters needed to expire business object data
*/
protected void executeS3SpecificStepsImpl(BusinessObjectDataRestoreDto businessObjectDataRestoreDto)
{
// Create an S3 file transfer parameters DTO to access the S3 bucket.
// Since the S3 key prefix represents a directory, we add a trailing '/' character to it.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = storageHelper.getS3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3Endpoint(businessObjectDataRestoreDto.getS3Endpoint());
s3FileTransferRequestParamsDto.setS3BucketName(businessObjectDataRestoreDto.getS3BucketName());
s3FileTransferRequestParamsDto.setS3KeyPrefix(StringUtils.appendIfMissing(businessObjectDataRestoreDto.getS3KeyPrefix(), "/"));
// Get a list of S3 files matching the S3 key prefix. When listing S3 files, we ignore 0 byte objects that represent S3 directories.
List<S3ObjectSummary> actualS3Files = s3Service.listDirectory(s3FileTransferRequestParamsDto, true);
// Validate existence and file size of the S3 files.
storageFileHelper
.validateRegisteredS3Files(businessObjectDataRestoreDto.getStorageFiles(), actualS3Files, businessObjectDataRestoreDto.getStorageName(),
businessObjectDataRestoreDto.getBusinessObjectDataKey());
// Build a list of files to expire by selection only objects that have Glacier or DeepArchive storage class.
List<S3ObjectSummary> glacierS3Files = new ArrayList<>();
for (S3ObjectSummary s3ObjectSummary : actualS3Files)
{
if (StorageClass.Glacier.toString().equals(s3ObjectSummary.getStorageClass()) ||
StorageClass.DeepArchive.toString().equals(s3ObjectSummary.getStorageClass()))
{
glacierS3Files.add(s3ObjectSummary);
}
}
// Set a list of files to expire.
s3FileTransferRequestParamsDto.setFiles(storageFileHelper.getFiles(storageFileHelper.createStorageFilesFromS3ObjectSummaries(glacierS3Files)));
// To expire the restored S3 objects, initiate restore requests with expiration set to 1 day.
s3Service.restoreObjects(s3FileTransferRequestParamsDto, 1, null);
}
/**
* Executes S3 specific steps for the business object data finalize restore.
*
* @param businessObjectDataRestoreDto the DTO that holds various parameters needed to perform a business object data restore
*/
protected void executeS3SpecificStepsImpl(BusinessObjectDataRestoreDto businessObjectDataRestoreDto)
{
// Create an S3 file transfer parameters DTO to access the S3 bucket.
// Since the S3 key prefix represents a directory, we add a trailing '/' character to it.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = storageHelper.getS3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(businessObjectDataRestoreDto.getS3BucketName());
s3FileTransferRequestParamsDto.setS3Endpoint(businessObjectDataRestoreDto.getS3Endpoint());
s3FileTransferRequestParamsDto.setS3KeyPrefix(StringUtils.appendIfMissing(businessObjectDataRestoreDto.getS3KeyPrefix(), "/"));
// Get actual S3 files by selecting all S3 keys matching the S3 key prefix form the S3 bucket.
// When listing S3 files, we ignore 0 byte objects that represent S3 directories.
List<S3ObjectSummary> actualS3Files = s3Service.listDirectory(s3FileTransferRequestParamsDto, true);
// Validate existence and file size of the S3 files.
storageFileHelper
.validateRegisteredS3Files(businessObjectDataRestoreDto.getStorageFiles(), actualS3Files, businessObjectDataRestoreDto.getStorageName(),
businessObjectDataRestoreDto.getBusinessObjectDataKey());
// Build a list of files to check for restore status by selection only objects that are currently archived in Glacier or DeepArchive storage class.
List<S3ObjectSummary> glacierS3Files = new ArrayList<>();
for (S3ObjectSummary s3ObjectSummary : actualS3Files)
{
if (StorageClass.Glacier.toString().equals(s3ObjectSummary.getStorageClass()) ||
StorageClass.DeepArchive.toString().equals(s3ObjectSummary.getStorageClass()))
{
glacierS3Files.add(s3ObjectSummary);
}
}
// Validate that all S3 files are now restored.
s3FileTransferRequestParamsDto.setFiles(storageFileHelper.getFiles(storageFileHelper.createStorageFilesFromS3ObjectSummaries(glacierS3Files)));
s3Service.validateGlacierS3FilesRestored(s3FileTransferRequestParamsDto);
}