下面列出了怎么用com.amazonaws.services.s3.model.S3VersionSummary的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjects(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
return versionListing;
}
/**
* Executes S3 specific steps required for initiation of a business object data destroy.
*
* @param businessObjectDataDestroyDto the DTO that holds various parameters needed to initiate a business object data destroy
*/
void executeS3SpecificStepsImpl(BusinessObjectDataDestroyDto businessObjectDataDestroyDto)
{
// Create an S3 file transfer parameters DTO to access the S3 bucket.
// Since the S3 key prefix represents a directory, we add a trailing '/' character to it.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = storageHelper.getS3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3Endpoint(businessObjectDataDestroyDto.getS3Endpoint());
s3FileTransferRequestParamsDto.setS3BucketName(businessObjectDataDestroyDto.getS3BucketName());
s3FileTransferRequestParamsDto.setS3KeyPrefix(StringUtils.appendIfMissing(businessObjectDataDestroyDto.getS3KeyPrefix(), "/"));
// Create an S3 file transfer parameters DTO to be used for S3 object tagging operation.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = storageHelper
.getS3FileTransferRequestParamsDtoByRole(businessObjectDataDestroyDto.getS3ObjectTaggerRoleArn(),
businessObjectDataDestroyDto.getS3ObjectTaggerRoleSessionName());
s3ObjectTaggerParamsDto.setS3Endpoint(businessObjectDataDestroyDto.getS3Endpoint());
// Get all S3 objects matching the S3 key prefix from the S3 bucket.
List<S3VersionSummary> s3VersionSummaries = s3Service.listVersions(s3FileTransferRequestParamsDto);
// Tag the S3 objects to initiate the deletion.
s3Service.tagVersions(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, s3VersionSummaries,
new Tag(businessObjectDataDestroyDto.getS3ObjectTagKey(), businessObjectDataDestroyDto.getS3ObjectTagValue()));
}
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjects(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
return versionListing;
}
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjectsV2(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
return versionListing;
}
private boolean deleteObject(final String bucket, final String key) {
this.s3Client.deleteObject(bucket, key);
final VersionListing versionList = this.s3Client.listVersions(bucket, key);
for (final S3VersionSummary summary : versionList.getVersionSummaries()) {
logger.debug("deleting version {}", summary.getKey());
this.s3Client.deleteVersion(bucket, summary.getKey(), summary.getVersionId());
}
return true;
}
@Test
public void testDeleteDirectoryAssertHandleAmazonClientException()
{
S3Operations originalS3Operations = (S3Operations) ReflectionTestUtils.getField(s3Dao, "s3Operations");
S3Operations mockS3Operations = mock(S3Operations.class);
ReflectionTestUtils.setField(s3Dao, "s3Operations", mockS3Operations);
try
{
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName("s3BucketName");
s3FileTransferRequestParamsDto.setS3KeyPrefix("s3KeyPrefix");
VersionListing versionListing = new VersionListing();
versionListing.getVersionSummaries().add(new S3VersionSummary());
when(mockS3Operations.listVersions(any(), any())).thenReturn(versionListing);
when(mockS3Operations.deleteObjects(any(), any())).thenThrow(new AmazonClientException("message"));
try
{
s3Dao.deleteDirectory(s3FileTransferRequestParamsDto);
fail();
}
catch (Exception e)
{
assertEquals(IllegalStateException.class, e.getClass());
assertEquals("Failed to delete keys/key versions with prefix \"s3KeyPrefix\" from bucket \"s3BucketName\". Reason: message", e.getMessage());
}
}
finally
{
ReflectionTestUtils.setField(s3Dao, "s3Operations", originalS3Operations);
}
}
@Test
public void testTagVersionsAmazonServiceException()
{
// Create an S3 file transfer request parameters DTO to access S3 objects with a mocked S3 bucket name that would trigger an AWS exception.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_INTERNAL_ERROR);
// Create an S3 version summary.
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setKey(S3_KEY);
s3VersionSummary.setVersionId(S3_VERSION_ID);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
try
{
s3Dao.tagVersions(params, new S3FileTransferRequestParamsDto(), Collections.singletonList(s3VersionSummary), tag);
fail();
}
catch (IllegalStateException e)
{
assertEquals(String.format("Failed to tag S3 object with \"%s\" key and \"%s\" version id in \"%s\" bucket. " +
"Reason: InternalError (Service: null; Status Code: 0; Error Code: InternalError; Request ID: null)", S3_KEY, S3_VERSION_ID,
MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_INTERNAL_ERROR), e.getMessage());
}
}
@Test
public void testTagVersionsOrphanS3DeleteMarker()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 file transfer request parameters DTO to tag S3 objects.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
s3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
s3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
s3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
// Create an S3 version summary for an S3 delete marker.
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setKey(S3_KEY);
s3VersionSummary.setVersionId(S3_VERSION_ID);
s3VersionSummary.setIsDeleteMarker(true);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Call the method under test.
s3DaoImpl.tagVersions(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, Collections.singletonList(s3VersionSummary), tag);
// Verify the external calls.
verifyNoMoreInteractionsHelper();
}
private Record createRecordForListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) {
final Map<String, Object> values = new HashMap<>();
values.put(KEY, versionSummary.getKey());
values.put(BUCKET, versionSummary.getBucketName());
if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
values.put(OWNER, versionSummary.getOwner().getId());
}
values.put(ETAG, versionSummary.getETag());
values.put(LAST_MODIFIED, new Timestamp(versionSummary.getLastModified().getTime()));
values.put(SIZE, versionSummary.getSize());
values.put(STORAGE_CLASS, versionSummary.getStorageClass());
values.put(IS_LATEST, versionSummary.isLatest());
final String versionId = versionSummary.getVersionId();
if (versionId != null && !versionId.equals(Constants.NULL_VERSION_ID)) {
values.put(VERSION_ID, versionSummary.getVersionId());
}
if (taggingResult != null) {
final Map<String, String> tags = new HashMap<>();
taggingResult.getTagSet().forEach(tag -> {
tags.put(tag.getKey(), tag.getValue());
});
values.put(TAGS, tags);
}
if (objectMetadata != null) {
values.put(USER_METADATA, objectMetadata.getUserMetadata());
}
return new MapRecord(RECORD_SCHEMA, values);
}
@Override
public void addToListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) {
// Create the attributes
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
attributes.put("s3.bucket", versionSummary.getBucketName());
if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
attributes.put("s3.owner", versionSummary.getOwner().getId());
}
attributes.put("s3.etag", versionSummary.getETag());
attributes.put("s3.lastModified", String.valueOf(versionSummary.getLastModified().getTime()));
attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
attributes.put("s3.storeClass", versionSummary.getStorageClass());
attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
if (versionSummary.getVersionId() != null) {
attributes.put("s3.version", versionSummary.getVersionId());
}
if (taggingResult != null) {
final List<Tag> tags = taggingResult.getTagSet();
for (final Tag tag : tags) {
attributes.put("s3.tag." + tag.getKey(), tag.getValue());
}
}
if (objectMetadata != null) {
for (Map.Entry<String, String> e : objectMetadata.getUserMetadata().entrySet()) {
attributes.put("s3.user.metadata." + e.getKey(), e.getValue());
}
}
// Create the flowfile
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
}
/**
* Delete an S3 bucket using the provided client. Coming from AWS documentation:
* https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
* @param s3Client the AmazonS3 client instance used to delete the bucket
* @param bucketName a String containing the bucket name
*/
public static void deleteBucket(AmazonS3 s3Client, String bucketName) {
// Delete all objects from the bucket. This is sufficient
// for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
// delete markers for all objects, but doesn't delete the object versions.
// To delete objects from versioned buckets, delete all of the object versions before deleting
// the bucket (see below for an example).
ObjectListing objectListing = s3Client.listObjects(bucketName);
while (true) {
Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator();
while (objIter.hasNext()) {
s3Client.deleteObject(bucketName, objIter.next().getKey());
}
// If the bucket contains many objects, the listObjects() call
// might not return all of the objects in the first listing. Check to
// see whether the listing was truncated. If so, retrieve the next page of objects
// and delete them.
if (objectListing.isTruncated()) {
objectListing = s3Client.listNextBatchOfObjects(objectListing);
} else {
break;
}
}
// Delete all object versions (required for versioned buckets).
VersionListing versionList = s3Client.listVersions(new ListVersionsRequest().withBucketName(bucketName));
while (true) {
Iterator<S3VersionSummary> versionIter = versionList.getVersionSummaries().iterator();
while (versionIter.hasNext()) {
S3VersionSummary vs = versionIter.next();
s3Client.deleteVersion(bucketName, vs.getKey(), vs.getVersionId());
}
if (versionList.isTruncated()) {
versionList = s3Client.listNextBatchOfVersions(versionList);
} else {
break;
}
}
// After all objects and object versions are deleted, delete the bucket.
s3Client.deleteBucket(bucketName);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
try {
restoreState(context);
} catch (IOException ioe) {
getLogger().error("Failed to restore processor state; yielding", ioe);
context.yield();
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final AmazonS3 client = getClient();
int listCount = 0;
long maxTimestamp = 0L;
String delimiter = context.getProperty(DELIMITER).getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
S3BucketLister bucketLister = useVersions
? new S3VersionBucketLister(client)
: new S3ObjectBucketLister(client);
bucketLister.setBucketName(bucket);
if (delimiter != null && !delimiter.isEmpty()) {
bucketLister.setDelimiter(delimiter);
}
if (prefix != null && !prefix.isEmpty()) {
bucketLister.setPrefix(prefix);
}
VersionListing versionListing;
do {
versionListing = bucketLister.listVersions();
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) {
continue;
}
// Create the attributes
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
attributes.put("s3.bucket", versionSummary.getBucketName());
if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
attributes.put("s3.owner", versionSummary.getOwner().getId());
}
attributes.put("s3.etag", versionSummary.getETag());
attributes.put("s3.lastModified", String.valueOf(lastModified));
attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
attributes.put("s3.storeClass", versionSummary.getStorageClass());
attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
if (versionSummary.getVersionId() != null) {
attributes.put("s3.version", versionSummary.getVersionId());
}
// Create the flowfile
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
// Update state
if (lastModified > maxTimestamp) {
maxTimestamp = lastModified;
currentKeys.clear();
}
if (lastModified == maxTimestamp) {
currentKeys.add(versionSummary.getKey());
}
listCount++;
}
bucketLister.setNextMarker();
commit(context, session, listCount);
listCount = 0;
} while (bucketLister.isTruncated());
currentTimestamp = maxTimestamp;
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
if (!commit(context, session, listCount)) {
if (currentTimestamp > 0) {
persistState(context);
}
getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
context.yield();
}
}
@Override
public List<S3VersionSummary> listVersions(S3FileTransferRequestParamsDto params)
{
return s3Dao.listVersions(params);
}
@Override
public void tagVersions(final S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto, final S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto,
final List<S3VersionSummary> s3VersionSummaries, final Tag tag)
{
s3Dao.tagVersions(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, s3VersionSummaries, tag);
}
@Test
public void testExecuteS3SpecificSteps()
{
// Create a business object data key.
BusinessObjectDataKey businessObjectDataKey =
new BusinessObjectDataKey(BDEF_NAMESPACE, BDEF_NAME, FORMAT_USAGE_CODE, FORMAT_FILE_TYPE_CODE, FORMAT_VERSION, PARTITION_VALUE,
NO_SUBPARTITION_VALUES, DATA_VERSION);
// Create a business object data destroy parameters DTO.
BusinessObjectDataDestroyDto businessObjectDataDestroyDto =
new BusinessObjectDataDestroyDto(businessObjectDataKey, STORAGE_NAME, BusinessObjectDataStatusEntity.DELETED, BusinessObjectDataStatusEntity.VALID,
StorageUnitStatusEntity.DISABLING, StorageUnitStatusEntity.ENABLED, S3_ENDPOINT, S3_BUCKET_NAME, TEST_S3_KEY_PREFIX, S3_OBJECT_TAG_KEY,
S3_OBJECT_TAG_VALUE, S3_OBJECT_TAGGER_ROLE_ARN, S3_OBJECT_TAGGER_ROLE_SESSION_NAME, BDATA_FINAL_DESTROY_DELAY_IN_DAYS);
// Create an S3 file transfer parameters DTO to access the S3 bucket.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
// Create an S3 file transfer parameters DTO to be used for S3 object tagging operation.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
s3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
s3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
s3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
// Create a list of all S3 versions matching the S3 key prefix form the S3 bucket.
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setKey(S3_KEY);
s3VersionSummary.setVersionId(S3_VERSION_ID);
List<S3VersionSummary> s3VersionSummaries = Collections.singletonList(s3VersionSummary);
// Create an updated S3 file transfer parameters DTO to access the S3 bucket.
S3FileTransferRequestParamsDto updatedS3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
updatedS3FileTransferRequestParamsDto.setS3Endpoint(S3_ENDPOINT);
updatedS3FileTransferRequestParamsDto.setS3BucketName(S3_BUCKET_NAME);
updatedS3FileTransferRequestParamsDto.setS3KeyPrefix(TEST_S3_KEY_PREFIX + "/");
// Create an updated S3 file transfer parameters DTO to be used for S3 object tagging operation.
S3FileTransferRequestParamsDto updatedS3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
updatedS3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
updatedS3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
updatedS3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
updatedS3ObjectTaggerParamsDto.setS3Endpoint(S3_ENDPOINT);
// Mock the external calls.
when(storageHelper.getS3FileTransferRequestParamsDto()).thenReturn(s3FileTransferRequestParamsDto);
when(storageHelper.getS3FileTransferRequestParamsDtoByRole(S3_OBJECT_TAGGER_ROLE_ARN, S3_OBJECT_TAGGER_ROLE_SESSION_NAME))
.thenReturn(s3ObjectTaggerParamsDto);
when(s3Service.listVersions(s3FileTransferRequestParamsDto)).thenReturn(s3VersionSummaries);
// Call the method under test.
businessObjectDataInitiateDestroyHelperServiceImpl.executeS3SpecificSteps(businessObjectDataDestroyDto);
// Validate the results.
assertEquals(
new BusinessObjectDataDestroyDto(businessObjectDataKey, STORAGE_NAME, BusinessObjectDataStatusEntity.DELETED, BusinessObjectDataStatusEntity.VALID,
StorageUnitStatusEntity.DISABLING, StorageUnitStatusEntity.ENABLED, S3_ENDPOINT, S3_BUCKET_NAME, TEST_S3_KEY_PREFIX, S3_OBJECT_TAG_KEY,
S3_OBJECT_TAG_VALUE, S3_OBJECT_TAGGER_ROLE_ARN, S3_OBJECT_TAGGER_ROLE_SESSION_NAME, BDATA_FINAL_DESTROY_DELAY_IN_DAYS),
businessObjectDataDestroyDto);
// Verify the external calls.
verify(storageHelper).getS3FileTransferRequestParamsDto();
verify(storageHelper).getS3FileTransferRequestParamsDtoByRole(S3_OBJECT_TAGGER_ROLE_ARN, S3_OBJECT_TAGGER_ROLE_SESSION_NAME);
verify(s3Service).listVersions(s3FileTransferRequestParamsDto);
verify(s3Service).tagVersions(updatedS3FileTransferRequestParamsDto, updatedS3ObjectTaggerParamsDto, s3VersionSummaries,
new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE));
verifyNoMoreInteractionsHelper();
}
@Override
public void deleteDirectory(final S3FileTransferRequestParamsDto params)
{
LOGGER.info("Deleting keys/key versions from S3... s3KeyPrefix=\"{}\" s3BucketName=\"{}\"", params.getS3KeyPrefix(), params.getS3BucketName());
Assert.isTrue(!isRootKeyPrefix(params.getS3KeyPrefix()), "Deleting from root directory is not allowed.");
try
{
// List S3 versions.
List<S3VersionSummary> s3VersionSummaries = listVersions(params);
LOGGER.info("Found keys/key versions in S3 for deletion. s3KeyCount={} s3KeyPrefix=\"{}\" s3BucketName=\"{}\"", s3VersionSummaries.size(),
params.getS3KeyPrefix(), params.getS3BucketName());
// In order to avoid a MalformedXML AWS exception, we send delete request only when we have any key versions to delete.
if (CollectionUtils.isNotEmpty(s3VersionSummaries))
{
// Create an S3 client.
AmazonS3Client s3Client = getAmazonS3(params);
// Build a list of objects to be deleted.
List<DeleteObjectsRequest.KeyVersion> keyVersions = new ArrayList<>();
for (S3VersionSummary s3VersionSummary : s3VersionSummaries)
{
keyVersions.add(new DeleteObjectsRequest.KeyVersion(s3VersionSummary.getKey(), s3VersionSummary.getVersionId()));
}
try
{
// Delete the key versions.
deleteKeyVersions(s3Client, params.getS3BucketName(), keyVersions);
}
finally
{
s3Client.shutdown();
}
}
}
catch (AmazonClientException e)
{
throw new IllegalStateException(String
.format("Failed to delete keys/key versions with prefix \"%s\" from bucket \"%s\". Reason: %s", params.getS3KeyPrefix(),
params.getS3BucketName(), e.getMessage()), e);
}
}
@Override
public List<S3VersionSummary> listVersions(final S3FileTransferRequestParamsDto params)
{
Assert.isTrue(!isRootKeyPrefix(params.getS3KeyPrefix()), "Listing of S3 versions from root directory is not allowed.");
AmazonS3Client s3Client = getAmazonS3(params);
List<S3VersionSummary> s3VersionSummaries = new ArrayList<>();
try
{
ListVersionsRequest listVersionsRequest = new ListVersionsRequest().withBucketName(params.getS3BucketName()).withPrefix(params.getS3KeyPrefix());
VersionListing versionListing;
do
{
versionListing = s3Operations.listVersions(listVersionsRequest, s3Client);
s3VersionSummaries.addAll(versionListing.getVersionSummaries());
listVersionsRequest.setKeyMarker(versionListing.getNextKeyMarker());
listVersionsRequest.setVersionIdMarker(versionListing.getNextVersionIdMarker());
}
while (versionListing.isTruncated());
}
catch (AmazonS3Exception amazonS3Exception)
{
if (S3Operations.ERROR_CODE_NO_SUCH_BUCKET.equals(amazonS3Exception.getErrorCode()))
{
throw new IllegalArgumentException("The specified bucket '" + params.getS3BucketName() + "' does not exist.", amazonS3Exception);
}
throw new IllegalStateException("Error accessing S3", amazonS3Exception);
}
catch (AmazonClientException e)
{
throw new IllegalStateException(String
.format("Failed to list S3 versions with prefix \"%s\" from bucket \"%s\". Reason: %s", params.getS3KeyPrefix(), params.getS3BucketName(),
e.getMessage()), e);
}
finally
{
// Shutdown the AmazonS3Client instance to release resources.
s3Client.shutdown();
}
return s3VersionSummaries;
}
private void tagVersionsHelper(final S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto,
final S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto, final List<S3VersionSummary> s3VersionSummaries, final Tag tag)
{
// Initialize an S3 version for the error message in the catch block.
S3VersionSummary currentS3VersionSummary = s3VersionSummaries.get(0);
// Amazon S3 client to access S3 objects.
AmazonS3Client s3Client = null;
// Amazon S3 client for S3 object tagging.
AmazonS3Client s3ObjectTaggerClient = null;
try
{
// Create an S3 client to access S3 objects.
s3Client = getAmazonS3(s3FileTransferRequestParamsDto);
// Create an S3 client for S3 object tagging.
s3ObjectTaggerClient = getAmazonS3(s3ObjectTaggerParamsDto);
// Create a get object tagging request.
GetObjectTaggingRequest getObjectTaggingRequest = new GetObjectTaggingRequest(s3FileTransferRequestParamsDto.getS3BucketName(), null, null);
// Create a set object tagging request.
SetObjectTaggingRequest setObjectTaggingRequest = new SetObjectTaggingRequest(s3FileTransferRequestParamsDto.getS3BucketName(), null, null, null);
for (S3VersionSummary s3VersionSummary : s3VersionSummaries)
{
// Set the current S3 version summary.
currentS3VersionSummary = s3VersionSummary;
// Retrieve the current tagging information for the S3 version.
getObjectTaggingRequest.setKey(s3VersionSummary.getKey());
getObjectTaggingRequest.setVersionId(s3VersionSummary.getVersionId());
GetObjectTaggingResult getObjectTaggingResult = s3Operations.getObjectTagging(getObjectTaggingRequest, s3Client);
// Update the list of tags to include the specified S3 object tag.
List<Tag> updatedTags = new ArrayList<>();
updatedTags.add(tag);
if (CollectionUtils.isNotEmpty(getObjectTaggingResult.getTagSet()))
{
for (Tag currentTag : getObjectTaggingResult.getTagSet())
{
if (!StringUtils.equals(tag.getKey(), currentTag.getKey()))
{
updatedTags.add(currentTag);
}
}
}
// Update tagging information for the S3 version.
setObjectTaggingRequest.setKey(s3VersionSummary.getKey());
setObjectTaggingRequest.setVersionId(s3VersionSummary.getVersionId());
setObjectTaggingRequest.setTagging(new ObjectTagging(updatedTags));
s3Operations.setObjectTagging(setObjectTaggingRequest, s3ObjectTaggerClient);
}
}
catch (Exception e)
{
throw new IllegalStateException(String
.format("Failed to tag S3 object with \"%s\" key and \"%s\" version id in \"%s\" bucket. Reason: %s", currentS3VersionSummary.getKey(),
currentS3VersionSummary.getVersionId(), s3FileTransferRequestParamsDto.getS3BucketName(), e.getMessage()), e);
}
finally
{
if (s3Client != null)
{
s3Client.shutdown();
}
if (s3ObjectTaggerClient != null)
{
s3ObjectTaggerClient.shutdown();
}
}
}
@Test
public void testListVersionsAssertKeyAndVersionIdMarker()
{
S3Operations originalS3Operations = (S3Operations) ReflectionTestUtils.getField(s3Dao, "s3Operations");
S3Operations mockS3Operations = mock(S3Operations.class);
ReflectionTestUtils.setField(s3Dao, "s3Operations", mockS3Operations);
try
{
String s3BucketName = "s3BucketName";
String s3KeyPrefix = "s3KeyPrefix";
String expectedKey = "key";
String expectedVersionId = "versionId";
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(s3BucketName);
s3FileTransferRequestParamsDto.setS3KeyPrefix(s3KeyPrefix);
when(mockS3Operations.listVersions(any(), any())).then(new Answer<VersionListing>()
{
@Override
public VersionListing answer(InvocationOnMock invocation) throws Throwable
{
ListVersionsRequest listVersionsRequest = invocation.getArgument(0);
String keyMarker = listVersionsRequest.getKeyMarker();
String versionIdMarker = listVersionsRequest.getVersionIdMarker();
VersionListing versionListing = new VersionListing();
if (keyMarker == null || versionIdMarker == null)
{
versionListing.setTruncated(true);
versionListing.setNextKeyMarker("nextKeyMarker");
versionListing.setNextVersionIdMarker("nextVersionIdMarker");
}
else
{
assertEquals("nextKeyMarker", listVersionsRequest.getKeyMarker());
assertEquals("nextVersionIdMarker", listVersionsRequest.getVersionIdMarker());
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setKey(expectedKey);
s3VersionSummary.setVersionId(expectedVersionId);
versionListing.getVersionSummaries().add(s3VersionSummary);
}
return versionListing;
}
});
List<S3VersionSummary> result = s3Dao.listVersions(s3FileTransferRequestParamsDto);
assertEquals(1, result.size());
assertEquals(expectedKey, result.get(0).getKey());
assertEquals(expectedVersionId, result.get(0).getVersionId());
}
finally
{
ReflectionTestUtils.setField(s3Dao, "s3Operations", originalS3Operations);
}
}
/**
* {@inheritDoc}
* <p/>
* If the bucket does not exist, returns a listing with an empty list. If a prefix is specified in listVersionsRequest, only versions starting with the
* prefix will be returned.
*/
@Override
public VersionListing listVersions(ListVersionsRequest listVersionsRequest, AmazonS3 s3Client)
{
LOGGER.debug("listVersions(): listVersionsRequest.getBucketName() = " + listVersionsRequest.getBucketName());
String bucketName = listVersionsRequest.getBucketName();
if (MOCK_S3_BUCKET_NAME_NO_SUCH_BUCKET_EXCEPTION.equals(bucketName))
{
AmazonS3Exception amazonS3Exception = new AmazonS3Exception(MOCK_S3_BUCKET_NAME_NO_SUCH_BUCKET_EXCEPTION);
amazonS3Exception.setErrorCode("NoSuchBucket");
throw amazonS3Exception;
}
else if (MOCK_S3_BUCKET_NAME_INTERNAL_ERROR.equals(bucketName))
{
throw new AmazonServiceException(S3Operations.ERROR_CODE_INTERNAL_ERROR);
}
VersionListing versionListing = new VersionListing();
versionListing.setBucketName(bucketName);
MockS3Bucket mockS3Bucket = mockS3Buckets.get(bucketName);
if (mockS3Bucket != null)
{
for (MockS3Object mockS3Object : mockS3Bucket.getVersions().values())
{
String s3ObjectKey = mockS3Object.getKey();
if (listVersionsRequest.getPrefix() == null || s3ObjectKey.startsWith(listVersionsRequest.getPrefix()))
{
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setBucketName(bucketName);
s3VersionSummary.setKey(s3ObjectKey);
s3VersionSummary.setVersionId(mockS3Object.getVersion());
s3VersionSummary.setSize(mockS3Object.getData().length);
s3VersionSummary.setStorageClass(mockS3Object.getObjectMetadata() != null ? mockS3Object.getObjectMetadata().getStorageClass() : null);
versionListing.getVersionSummaries().add(s3VersionSummary);
}
}
}
return versionListing;
}
@Test
public void testDeleteDirectoryMultiObjectDeleteException()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(S3_BUCKET_NAME);
s3FileTransferRequestParamsDto.setS3KeyPrefix(S3_KEY_PREFIX);
// Create a retry policy.
RetryPolicy retryPolicy =
new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY, INTEGER_VALUE, true);
// Create an S3 version summary.
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setKey(S3_KEY);
s3VersionSummary.setVersionId(S3_VERSION_ID);
// Create a version listing.
VersionListing versionListing = new VersionListing();
versionListing.setVersionSummaries(Collections.singletonList(s3VersionSummary));
// Create a delete error.
MultiObjectDeleteException.DeleteError deleteError = new MultiObjectDeleteException.DeleteError();
deleteError.setKey(S3_KEY);
deleteError.setVersionId(S3_VERSION_ID);
deleteError.setCode(ERROR_CODE);
deleteError.setMessage(ERROR_MESSAGE);
// Create a multi object delete exception.
MultiObjectDeleteException multiObjectDeleteException = new MultiObjectDeleteException(Collections.singletonList(deleteError), new ArrayList<>());
// Mock the external calls.
when(retryPolicyFactory.getRetryPolicy()).thenReturn(retryPolicy);
when(s3Operations.listVersions(any(ListVersionsRequest.class), any(AmazonS3Client.class))).thenReturn(versionListing);
when(s3Operations.deleteObjects(any(DeleteObjectsRequest.class), any(AmazonS3Client.class))).thenThrow(multiObjectDeleteException);
// Try to call the method under test.
try
{
s3DaoImpl.deleteDirectory(s3FileTransferRequestParamsDto);
}
catch (IllegalStateException e)
{
assertEquals(String.format(
"Failed to delete keys/key versions with prefix \"%s\" from bucket \"%s\". Reason: One or more objects could not be deleted " +
"(Service: null; Status Code: 0; Error Code: null; Request ID: null; S3 Extended Request ID: null)", S3_KEY_PREFIX, S3_BUCKET_NAME),
e.getMessage());
}
// Verify the external calls.
verify(retryPolicyFactory, times(2)).getRetryPolicy();
verify(s3Operations).listVersions(any(ListVersionsRequest.class), any(AmazonS3Client.class));
verify(s3Operations).deleteObjects(any(DeleteObjectsRequest.class), any(AmazonS3Client.class));
verifyNoMoreInteractionsHelper();
}
private void runTagVersionsTest()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 file transfer request parameters DTO to tag S3 objects.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
s3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
s3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
s3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
// Create an S3 version summary.
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setKey(S3_KEY);
s3VersionSummary.setVersionId(S3_VERSION_ID);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Create a retry policy.
RetryPolicy retryPolicy =
new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY, INTEGER_VALUE, true);
// Create a get object tagging result.
GetObjectTaggingResult getObjectTaggingResult = new GetObjectTaggingResult(null);
// Create a set object tagging result.
SetObjectTaggingResult setObjectTaggingResult = new SetObjectTaggingResult();
// Mock the external calls.
when(retryPolicyFactory.getRetryPolicy()).thenReturn(retryPolicy);
when(s3Operations.getObjectTagging(any(GetObjectTaggingRequest.class), any(AmazonS3Client.class))).thenReturn(getObjectTaggingResult);
when(s3Operations.setObjectTagging(any(SetObjectTaggingRequest.class), any(AmazonS3Client.class))).thenReturn(setObjectTaggingResult);
// Call the method under test.
s3DaoImpl.tagVersions(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, Collections.singletonList(s3VersionSummary), tag);
// Verify the external calls.
verify(retryPolicyFactory, times(2)).getRetryPolicy();
verify(s3Operations).getObjectTagging(any(GetObjectTaggingRequest.class), any(AmazonS3Client.class));
verify(s3Operations).setObjectTagging(any(SetObjectTaggingRequest.class), any(AmazonS3Client.class));
verifyNoMoreInteractionsHelper();
}
private void delete(VersionListing versionListing) {
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
s3.deleteVersion(versionSummary.getBucketName(), versionSummary.getKey(), versionSummary.getVersionId());
}
}
@Override
public void addToListing(final S3VersionSummary summary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) throws IOException {
recordWriter.write(createRecordForListing(summary, taggingResult, objectMetadata));
}
@Test
public void testListVersions() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
runner.setProperty(ListS3.USE_VERSIONS, "true");
Date lastModified = new Date();
VersionListing versionListing = new VersionListing();
S3VersionSummary versionSummary1 = new S3VersionSummary();
versionSummary1.setBucketName("test-bucket");
versionSummary1.setKey("test-key");
versionSummary1.setVersionId("1");
versionSummary1.setLastModified(lastModified);
versionListing.getVersionSummaries().add(versionSummary1);
S3VersionSummary versionSummary2 = new S3VersionSummary();
versionSummary2.setBucketName("test-bucket");
versionSummary2.setKey("test-key");
versionSummary2.setVersionId("2");
versionSummary2.setLastModified(lastModified);
versionListing.getVersionSummaries().add(versionSummary2);
Mockito.when(mockS3Client.listVersions(Mockito.any(ListVersionsRequest.class))).thenReturn(versionListing);
runner.run();
ArgumentCaptor<ListVersionsRequest> captureRequest = ArgumentCaptor.forClass(ListVersionsRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listVersions(captureRequest.capture());
ListVersionsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
Mockito.verify(mockS3Client, Mockito.never()).listObjects(Mockito.any(ListObjectsRequest.class));
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "test-key");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
ff0.assertAttributeEquals("s3.lastModified", String.valueOf(lastModified.getTime()));
ff0.assertAttributeEquals("s3.version", "1");
MockFlowFile ff1 = flowFiles.get(1);
ff1.assertAttributeEquals("filename", "test-key");
ff1.assertAttributeEquals("s3.bucket", "test-bucket");
ff1.assertAttributeEquals("s3.lastModified", String.valueOf(lastModified.getTime()));
ff1.assertAttributeEquals("s3.version", "2");
}
/**
* Lists all S3 versions matching the S3 key prefix in the given bucket (S3 bucket name).
*
* @param s3FileTransferRequestParamsDto the S3 file transfer request parameters. The S3 bucket name and S3 key prefix identify the S3 versions to get
* listed.
*
* @return the list of all S3 versions that match the prefix in the given bucket
*/
List<S3VersionSummary> listVersions(S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto);
/**
* Tags S3 versions with the specified S3 object tag.
*
* @param s3FileTransferRequestParamsDto the S3 file transfer request parameters
* @param s3ObjectTaggerParamsDto the S3 file transfer request parameters to be used for tagging S3 objects
* @param s3VersionSummaries the list of S3 versions to be tagged
* @param tag the S3 object tag
*/
void tagVersions(final S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto, final S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto,
final List<S3VersionSummary> s3VersionSummaries, final Tag tag);
/**
* Lists all S3 versions matching the S3 key prefix in the given bucket (S3 bucket name). The S3 bucket name and S3 key prefix that identify the S3 versions
* to get listed are taken from the S3 file transfer request parameters DTO.
*
* @param params the S3 file transfer request parameters
*
* @return the list of all S3 versions that match the prefix in the given bucket
*/
List<S3VersionSummary> listVersions(final S3FileTransferRequestParamsDto params);
/**
* Tags S3 versions with the specified S3 object tag.
*
* @param s3FileTransferRequestParamsDto the S3 file transfer request parameters. This set of parameters contains the S3 bucket name
* @param s3ObjectTaggerParamsDto the S3 file transfer request parameters to be used for tagging S3 objects
* @param s3VersionSummaries the list of S3 versions to be tagged
* @param tag the S3 object tag
*/
void tagVersions(final S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto, final S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto,
final List<S3VersionSummary> s3VersionSummaries, final Tag tag);
void addToListing(S3VersionSummary summary, GetObjectTaggingResult taggingResult, ObjectMetadata objectMetadata) throws IOException;