下面列出了怎么用com.amazonaws.services.s3.model.Tag的API类实例代码及写法,或者点击链接到github查看源代码。
void put(String name, byte[] content, Retention retention, ObjectMetadata objectMetadata) {
Preconditions.checkNotNull(objectMetadata);
objectMetadata.setContentLength(content.length);
PutObjectRequest putRequest = new PutObjectRequest(
s3BlobStorageConfigurationProperties.getBucket(),
getFullName(name),
new ByteArrayInputStream(content),
objectMetadata);
List<Tag> tags = new ArrayList<Tag>();
tags.add(new Tag("retention", retention.toString()));
putRequest.setTagging(new ObjectTagging(tags));
amazonS3.putObject(putRequest);
}
/**
* Executes S3 specific steps required for initiation of a business object data destroy.
*
* @param businessObjectDataDestroyDto the DTO that holds various parameters needed to initiate a business object data destroy
*/
void executeS3SpecificStepsImpl(BusinessObjectDataDestroyDto businessObjectDataDestroyDto)
{
// Create an S3 file transfer parameters DTO to access the S3 bucket.
// Since the S3 key prefix represents a directory, we add a trailing '/' character to it.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = storageHelper.getS3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3Endpoint(businessObjectDataDestroyDto.getS3Endpoint());
s3FileTransferRequestParamsDto.setS3BucketName(businessObjectDataDestroyDto.getS3BucketName());
s3FileTransferRequestParamsDto.setS3KeyPrefix(StringUtils.appendIfMissing(businessObjectDataDestroyDto.getS3KeyPrefix(), "/"));
// Create an S3 file transfer parameters DTO to be used for S3 object tagging operation.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = storageHelper
.getS3FileTransferRequestParamsDtoByRole(businessObjectDataDestroyDto.getS3ObjectTaggerRoleArn(),
businessObjectDataDestroyDto.getS3ObjectTaggerRoleSessionName());
s3ObjectTaggerParamsDto.setS3Endpoint(businessObjectDataDestroyDto.getS3Endpoint());
// Get all S3 objects matching the S3 key prefix from the S3 bucket.
List<S3VersionSummary> s3VersionSummaries = s3Service.listVersions(s3FileTransferRequestParamsDto);
// Tag the S3 objects to initiate the deletion.
s3Service.tagVersions(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, s3VersionSummaries,
new Tag(businessObjectDataDestroyDto.getS3ObjectTagKey(), businessObjectDataDestroyDto.getS3ObjectTagValue()));
}
@Test
public void testTagObjects()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
// Create an S3 file transfer request parameters DTO to tag S3 objects.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
s3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
s3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
s3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
// Create an S3 object summary.
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setKey(S3_KEY);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Call the method under test.
s3Service.tagObjects(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, Collections.singletonList(s3ObjectSummary), tag);
// Verify the external calls.
verify(s3Dao).tagObjects(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, Collections.singletonList(s3ObjectSummary), tag);
verifyNoMoreInteractions(s3Dao);
}
@Test
public void testTagObjects()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 object summary.
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setKey(TARGET_S3_KEY);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Put a file in S3.
s3Operations.putObject(new PutObjectRequest(S3_BUCKET_NAME, TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]), new ObjectMetadata()), null);
// Tag the file with an S3 object tag.
s3Dao.tagObjects(params, new S3FileTransferRequestParamsDto(), Collections.singletonList(s3ObjectSummary), tag);
// Validate that the object got tagged.
GetObjectTaggingResult getObjectTaggingResult = s3Operations.getObjectTagging(new GetObjectTaggingRequest(S3_BUCKET_NAME, TARGET_S3_KEY), null);
assertEquals(Collections.singletonList(tag), getObjectTaggingResult.getTagSet());
}
@Test
public void testTagObjectsAmazonServiceException()
{
// Create an S3 file transfer request parameters DTO to access S3 objects with a mocked S3 bucket name that would trigger an AWS exception.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_INTERNAL_ERROR);
// Create an S3 object summary.
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setKey(TARGET_S3_KEY);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
try
{
s3Dao.tagObjects(params, new S3FileTransferRequestParamsDto(), Collections.singletonList(s3ObjectSummary), tag);
fail();
}
catch (IllegalStateException e)
{
assertEquals(String.format("Failed to tag S3 object with \"%s\" key and \"null\" version id in \"%s\" bucket. " +
"Reason: InternalError (Service: null; Status Code: 0; Error Code: InternalError; Request ID: null)", TARGET_S3_KEY,
MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_INTERNAL_ERROR), e.getMessage());
}
}
@Test
public void testTagObjectsNoS3ObjectSummaries()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 file transfer request parameters DTO to tag S3 objects.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
s3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
s3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
s3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Call the method under test with a list of S3 object summaries passed as null.
s3DaoImpl.tagObjects(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, null, tag);
// Verify the external calls.
verifyNoMoreInteractionsHelper();
}
@Test
public void testTagVersionsNoS3VersionSummaries()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 file transfer request parameters DTO to tag S3 objects.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
s3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
s3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
s3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Call the method under test with a list of S3 version summaries passed as null.
s3DaoImpl.tagVersions(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, null, tag);
// Verify the external calls.
verifyNoMoreInteractionsHelper();
}
private List<Tag> getObjectTags(ProcessContext context, FlowFile flowFile) {
final String prefix = context.getProperty(OBJECT_TAGS_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
final List<Tag> objectTags = new ArrayList<>();
final Map<String, String> attributesMap = flowFile.getAttributes();
attributesMap.entrySet().stream().sequential()
.filter(attribute -> attribute.getKey().startsWith(prefix))
.forEach(attribute -> {
String tagKey = attribute.getKey();
String tagValue = attribute.getValue();
if (context.getProperty(REMOVE_TAG_PREFIX).asBoolean()) {
tagKey = tagKey.replace(prefix, "");
}
objectTags.add(new Tag(tagKey, tagValue));
});
return objectTags;
}
@Test
public void testObjectTags() {
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "false");
prepareTest();
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
List<Tag> tagSet = request.getTagging().getTagSet();
assertEquals(1, tagSet.size());
assertEquals("tagS3PII", tagSet.get(0).getKey());
assertEquals("true", tagSet.get(0).getValue());
}
@Test
public void testTagObjectVersion() throws IOException {
final String tagKey = "k";
final String tagVal = "v";
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.VERSION_ID, "test-version");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
runner.setProperty(TagS3Object.APPEND_TAG, "false");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "object-key");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
ArgumentCaptor<SetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).setObjectTagging(captureRequest.capture());
SetObjectTaggingRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("object-key", request.getKey());
assertEquals("test-version", request.getVersionId());
assertTrue("Expected tag not found in request", request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
}
@Test
public void testTagObjectS3Exception() {
//set up existing tags on S3 object
Tag currentTag = new Tag("ck", "cv");
mockGetExistingTags(currentTag);
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
runner.enqueue(new byte[0], attrs);
Mockito.doThrow(new AmazonS3Exception("TagFailure")).when(mockS3Client).setObjectTagging(Mockito.any());
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
ArgumentCaptor<SetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
}
/**
* Instantiates a new bucket VH.
*
* @param bucket the bucket
* @param location the location
* @param versionConfig the version config
* @param tags the tags
*/
public BucketVH(Bucket bucket,String location,BucketVersioningConfiguration versionConfig, List<Tag> tags, String bucketEncryp, boolean websiteConfiguration,BucketLoggingConfiguration bucketLoggingConfiguration){
this.bucket = bucket;
this.location = location;
this.versionStatus = versionConfig==null?"":versionConfig.getStatus();
this.mfaDelete = versionConfig==null?null:versionConfig.isMfaDeleteEnabled();
this.tags = tags;
this.bucketEncryp = bucketEncryp;
this.websiteConfiguration = websiteConfiguration;
this.isLoggingEnabled = bucketLoggingConfiguration==null?null:bucketLoggingConfiguration.isLoggingEnabled();
this.destinationBucketName = bucketLoggingConfiguration==null?"":bucketLoggingConfiguration.getDestinationBucketName();
this.logFilePrefix = bucketLoggingConfiguration==null?"":bucketLoggingConfiguration.getLogFilePrefix();
}
/**
* Executes a storage policy transition as per specified storage policy selection.
*
* @param storagePolicyTransitionParamsDto the storage policy transition DTO that contains parameters needed to perform a storage policy transition
*/
protected void executeStoragePolicyTransitionImpl(StoragePolicyTransitionParamsDto storagePolicyTransitionParamsDto)
{
// Create an S3 file transfer parameters DTO to access the S3 bucket.
// Since the S3 key prefix represents a directory, we add a trailing '/' character to it.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = storageHelper.getS3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3Endpoint(storagePolicyTransitionParamsDto.getS3Endpoint());
s3FileTransferRequestParamsDto.setS3BucketName(storagePolicyTransitionParamsDto.getS3BucketName());
s3FileTransferRequestParamsDto.setS3KeyPrefix(StringUtils.appendIfMissing(storagePolicyTransitionParamsDto.getS3KeyPrefix(), "/"));
// Create an S3 file transfer parameters DTO to be used for S3 object tagging operation.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = storageHelper
.getS3FileTransferRequestParamsDtoByRole(storagePolicyTransitionParamsDto.getS3ObjectTaggerRoleArn(),
storagePolicyTransitionParamsDto.getS3ObjectTaggerRoleSessionName());
s3ObjectTaggerParamsDto.setS3Endpoint(storagePolicyTransitionParamsDto.getS3Endpoint());
// Get actual S3 files by selecting all S3 keys matching the S3 key prefix form the S3 bucket.
// When listing S3 files, we ignore 0 byte objects that represent S3 directories.
List<S3ObjectSummary> actualS3FilesWithoutZeroByteDirectoryMarkers = s3Service.listDirectory(s3FileTransferRequestParamsDto, true);
// Validate existence of the S3 files.
storageFileHelper.validateRegisteredS3Files(storagePolicyTransitionParamsDto.getStorageFiles(), actualS3FilesWithoutZeroByteDirectoryMarkers,
storagePolicyTransitionParamsDto.getStorageName(), storagePolicyTransitionParamsDto.getBusinessObjectDataKey());
// Get actual S3 files by selecting all S3 keys matching the S3 key prefix form the S3 bucket.
// This time, we do not ignore 0 byte objects that represent S3 directories.
List<S3ObjectSummary> actualS3Files = s3Service.listDirectory(s3FileTransferRequestParamsDto, false);
// Tag the S3 objects to initiate the archiving.
s3Service.tagObjects(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, actualS3Files,
new Tag(storagePolicyTransitionParamsDto.getS3ObjectTagKey(), storagePolicyTransitionParamsDto.getS3ObjectTagValue()));
}
@Test
public void testTagObjectsOtherTagKeyAlreadyExists()
{
// Create two S3 object tags having different tag keys.
List<Tag> tags = Arrays.asList(new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE), new Tag(S3_OBJECT_TAG_KEY_2, S3_OBJECT_TAG_VALUE_2));
// Put a file in S3 that is already tagged with the first S3 object tag.
PutObjectRequest putObjectRequest = new PutObjectRequest(S3_BUCKET_NAME, TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]), new ObjectMetadata());
putObjectRequest.setTagging(new ObjectTagging(Collections.singletonList(tags.get(0))));
s3Operations.putObject(putObjectRequest, null);
// Validate that the S3 object is tagged with the first tag only.
GetObjectTaggingResult getObjectTaggingResult = s3Operations.getObjectTagging(new GetObjectTaggingRequest(S3_BUCKET_NAME, TARGET_S3_KEY), null);
assertEquals(Collections.singletonList(tags.get(0)), getObjectTaggingResult.getTagSet());
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 object summary.
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setKey(TARGET_S3_KEY);
// Tag the S3 file with the second S3 object tag.
s3Dao.tagObjects(params, new S3FileTransferRequestParamsDto(), Collections.singletonList(s3ObjectSummary), tags.get(1));
// Validate that the S3 object is now tagged with both tags.
getObjectTaggingResult = s3Operations.getObjectTagging(new GetObjectTaggingRequest(S3_BUCKET_NAME, TARGET_S3_KEY), null);
assertEquals(tags.size(), getObjectTaggingResult.getTagSet().size());
assertTrue(getObjectTaggingResult.getTagSet().containsAll(tags));
}
@Test
public void testTagObjectsTargetTagKeyAlreadyExists()
{
// Create two S3 object tags having the same tag key.
List<Tag> tags = Arrays.asList(new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE), new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE_2));
// Put a file in S3 that is already tagged with the first S3 object tag.
PutObjectRequest putObjectRequest = new PutObjectRequest(S3_BUCKET_NAME, TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]), new ObjectMetadata());
putObjectRequest.setTagging(new ObjectTagging(Collections.singletonList(tags.get(0))));
s3Operations.putObject(putObjectRequest, null);
// Validate that the S3 object is tagged with the first tag.
GetObjectTaggingResult getObjectTaggingResult = s3Operations.getObjectTagging(new GetObjectTaggingRequest(S3_BUCKET_NAME, TARGET_S3_KEY), null);
assertEquals(Collections.singletonList(tags.get(0)), getObjectTaggingResult.getTagSet());
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 object summary.
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setKey(TARGET_S3_KEY);
// Tag the S3 file with the second S3 object tag.
s3Dao.tagObjects(params, new S3FileTransferRequestParamsDto(), Collections.singletonList(s3ObjectSummary), tags.get(1));
// Validate that the S3 object is tagged with the second tag now.
getObjectTaggingResult = s3Operations.getObjectTagging(new GetObjectTaggingRequest(S3_BUCKET_NAME, TARGET_S3_KEY), null);
assertEquals(Collections.singletonList(tags.get(1)), getObjectTaggingResult.getTagSet());
}
@Test
public void testTagVersionsAmazonServiceException()
{
// Create an S3 file transfer request parameters DTO to access S3 objects with a mocked S3 bucket name that would trigger an AWS exception.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_INTERNAL_ERROR);
// Create an S3 version summary.
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setKey(S3_KEY);
s3VersionSummary.setVersionId(S3_VERSION_ID);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
try
{
s3Dao.tagVersions(params, new S3FileTransferRequestParamsDto(), Collections.singletonList(s3VersionSummary), tag);
fail();
}
catch (IllegalStateException e)
{
assertEquals(String.format("Failed to tag S3 object with \"%s\" key and \"%s\" version id in \"%s\" bucket. " +
"Reason: InternalError (Service: null; Status Code: 0; Error Code: InternalError; Request ID: null)", S3_KEY, S3_VERSION_ID,
MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_INTERNAL_ERROR), e.getMessage());
}
}
@Test
public void testTagVersionsOtherTagKeyAlreadyExists()
{
// Create two S3 object tags having different tag keys.
List<Tag> tags = Arrays.asList(new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE), new Tag(S3_OBJECT_TAG_KEY_2, S3_OBJECT_TAG_VALUE_2));
// Put an S3 file that is already tagged with the first S3 object tag in an S3 bucket that has versioning enabled.
PutObjectRequest putObjectRequest =
new PutObjectRequest(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED, TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]),
new ObjectMetadata());
putObjectRequest.setTagging(new ObjectTagging(Collections.singletonList(tags.get(0))));
s3Operations.putObject(putObjectRequest, null);
// List S3 versions that match the test S3 key.
ListVersionsRequest listVersionsRequest =
new ListVersionsRequest().withBucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED).withPrefix(TARGET_S3_KEY);
VersionListing versionListing = s3Operations.listVersions(listVersionsRequest, null);
assertEquals(1, CollectionUtils.size(versionListing.getVersionSummaries()));
assertEquals(TARGET_S3_KEY, versionListing.getVersionSummaries().get(0).getKey());
assertNotNull(versionListing.getVersionSummaries().get(0).getVersionId());
// Validate that the S3 object is tagged with the first tag only.
GetObjectTaggingResult getObjectTaggingResult = s3Operations.getObjectTagging(
new GetObjectTaggingRequest(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED, TARGET_S3_KEY,
versionListing.getVersionSummaries().get(0).getVersionId()), null);
assertEquals(Collections.singletonList(tags.get(0)), getObjectTaggingResult.getTagSet());
// Tag the S3 version with the second S3 object tag.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED);
s3Dao.tagVersions(params, new S3FileTransferRequestParamsDto(), versionListing.getVersionSummaries(), tags.get(1));
// Validate that the S3 object is now tagged with both tags.
getObjectTaggingResult = s3Operations.getObjectTagging(
new GetObjectTaggingRequest(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED, TARGET_S3_KEY,
versionListing.getVersionSummaries().get(0).getVersionId()), null);
assertEquals(tags.size(), getObjectTaggingResult.getTagSet().size());
assertTrue(getObjectTaggingResult.getTagSet().containsAll(tags));
}
@Test
public void testTagVersionsS3BucketWithVersioningDisabled()
{
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Put S3 objects in S3 bucket that has versioning disabled.
for (int i = 0; i < 2; i++)
{
s3Operations.putObject(new PutObjectRequest(S3_BUCKET_NAME, TARGET_S3_KEY + i, new ByteArrayInputStream(new byte[1]), new ObjectMetadata()), null);
}
// List S3 versions that match the test prefix.
ListVersionsRequest listVersionsRequest = new ListVersionsRequest().withBucketName(S3_BUCKET_NAME).withPrefix(TARGET_S3_KEY);
VersionListing versionListing = s3Operations.listVersions(listVersionsRequest, null);
assertEquals(2, CollectionUtils.size(versionListing.getVersionSummaries()));
for (int i = 0; i < 2; i++)
{
assertNull(versionListing.getVersionSummaries().get(i).getVersionId());
}
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(S3_BUCKET_NAME);
// Tag listed S3 versions with an S3 object tag.
s3Dao.tagVersions(params, new S3FileTransferRequestParamsDto(), versionListing.getVersionSummaries(), tag);
// Validate that both S3 objects got tagged.
for (int i = 0; i < 2; i++)
{
GetObjectTaggingResult getObjectTaggingResult =
s3Operations.getObjectTagging(new GetObjectTaggingRequest(S3_BUCKET_NAME, TARGET_S3_KEY + i, null), null);
assertEquals(Collections.singletonList(tag), getObjectTaggingResult.getTagSet());
}
}
@Test
public void testTagVersionsS3BucketWithVersioningEnabled()
{
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Put two S3 versions in S3 bucket that has versioning enabled.
for (int i = 0; i < 2; i++)
{
s3Operations.putObject(
new PutObjectRequest(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED, TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]),
new ObjectMetadata()), null);
}
// List S3 versions that match the test key.
ListVersionsRequest listVersionsRequest =
new ListVersionsRequest().withBucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED).withPrefix(TARGET_S3_KEY);
VersionListing versionListing = s3Operations.listVersions(listVersionsRequest, null);
assertEquals(2, CollectionUtils.size(versionListing.getVersionSummaries()));
for (int i = 0; i < 2; i++)
{
assertNotNull(versionListing.getVersionSummaries().get(i).getVersionId());
}
// Create an S3 file transfer request parameters DTO to access S3 objects with a mocked S3 bucket name that enables S3 bucket versioning.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED);
// Tag listed S3 version with an S3 object tag.
s3Dao.tagVersions(params, new S3FileTransferRequestParamsDto(), versionListing.getVersionSummaries(), tag);
// Validate that both versions got tagged.
for (int i = 0; i < 2; i++)
{
GetObjectTaggingResult getObjectTaggingResult = s3Operations.getObjectTagging(
new GetObjectTaggingRequest(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED, TARGET_S3_KEY,
versionListing.getVersionSummaries().get(i).getVersionId()), null);
assertEquals(Collections.singletonList(tag), getObjectTaggingResult.getTagSet());
}
}
@Test
public void testTagVersionsTargetTagKeyAlreadyExists()
{
// Create two S3 object tags having the same tag key.
List<Tag> tags = Arrays.asList(new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE), new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE_2));
// Put an S3 file that is already tagged with the first S3 object tag in an S3 bucket that has versioning enabled.
PutObjectRequest putObjectRequest =
new PutObjectRequest(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED, TARGET_S3_KEY, new ByteArrayInputStream(new byte[1]),
new ObjectMetadata());
putObjectRequest.setTagging(new ObjectTagging(Collections.singletonList(tags.get(0))));
s3Operations.putObject(putObjectRequest, null);
// List S3 versions that match the test S3 key.
ListVersionsRequest listVersionsRequest =
new ListVersionsRequest().withBucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED).withPrefix(TARGET_S3_KEY);
VersionListing versionListing = s3Operations.listVersions(listVersionsRequest, null);
assertEquals(1, CollectionUtils.size(versionListing.getVersionSummaries()));
assertEquals(TARGET_S3_KEY, versionListing.getVersionSummaries().get(0).getKey());
assertNotNull(versionListing.getVersionSummaries().get(0).getVersionId());
// Validate that the S3 object is tagged with the first tag only.
GetObjectTaggingResult getObjectTaggingResult = s3Operations.getObjectTagging(
new GetObjectTaggingRequest(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED, TARGET_S3_KEY,
versionListing.getVersionSummaries().get(0).getVersionId()), null);
assertEquals(Collections.singletonList(tags.get(0)), getObjectTaggingResult.getTagSet());
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto params = new S3FileTransferRequestParamsDto();
params.setS3BucketName(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED);
// Tag the S3 version with the second S3 object tag.
s3Dao.tagVersions(params, new S3FileTransferRequestParamsDto(), versionListing.getVersionSummaries(), tags.get(1));
// Validate that the S3 object is now tagged with the second tag only.
getObjectTaggingResult = s3Operations.getObjectTagging(
new GetObjectTaggingRequest(MockS3OperationsImpl.MOCK_S3_BUCKET_NAME_VERSIONING_ENABLED, TARGET_S3_KEY,
versionListing.getVersionSummaries().get(0).getVersionId()), null);
assertEquals(Collections.singletonList(tags.get(1)), getObjectTaggingResult.getTagSet());
}
@Test
public void testTagObjectsS3ClientCreationFails()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 file transfer request parameters DTO to tag S3 objects.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
s3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
s3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
s3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
// Create an S3 object summary.
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setKey(S3_KEY);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Mock the external calls.
when(retryPolicyFactory.getRetryPolicy()).thenThrow(new AmazonServiceException(ERROR_MESSAGE));
// Try to call the method under test.
try
{
s3DaoImpl.tagObjects(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, Collections.singletonList(s3ObjectSummary), tag);
}
catch (IllegalStateException e)
{
assertEquals(String.format("Failed to tag S3 object with \"%s\" key and \"null\" version id in \"%s\" bucket. " +
"Reason: %s (Service: null; Status Code: 0; Error Code: null; Request ID: null)", S3_KEY, S3_BUCKET_NAME, ERROR_MESSAGE), e.getMessage());
}
// Verify the external calls.
verify(retryPolicyFactory).getRetryPolicy();
verifyNoMoreInteractionsHelper();
}
@Test
public void testTagVersionsOrphanS3DeleteMarker()
{
// Create an S3 file transfer request parameters DTO to access S3 objects.
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(S3_BUCKET_NAME);
// Create an S3 file transfer request parameters DTO to tag S3 objects.
S3FileTransferRequestParamsDto s3ObjectTaggerParamsDto = new S3FileTransferRequestParamsDto();
s3ObjectTaggerParamsDto.setAwsAccessKeyId(AWS_ASSUMED_ROLE_ACCESS_KEY);
s3ObjectTaggerParamsDto.setAwsSecretKey(AWS_ASSUMED_ROLE_SECRET_KEY);
s3ObjectTaggerParamsDto.setSessionToken(AWS_ASSUMED_ROLE_SESSION_TOKEN);
// Create an S3 version summary for an S3 delete marker.
S3VersionSummary s3VersionSummary = new S3VersionSummary();
s3VersionSummary.setKey(S3_KEY);
s3VersionSummary.setVersionId(S3_VERSION_ID);
s3VersionSummary.setIsDeleteMarker(true);
// Create an S3 object tag.
Tag tag = new Tag(S3_OBJECT_TAG_KEY, S3_OBJECT_TAG_VALUE);
// Call the method under test.
s3DaoImpl.tagVersions(s3FileTransferRequestParamsDto, s3ObjectTaggerParamsDto, Collections.singletonList(s3VersionSummary), tag);
// Verify the external calls.
verifyNoMoreInteractionsHelper();
}
private void changeExistingObject(
Record record,
ELVars variables,
String bucket,
String objectPath
) throws OnRecordErrorException {
// Tag application
if(!config.taskConfig.tags.isEmpty()) {
List<Tag> newTags = new ArrayList<>();
// Evaluate each tag separately
for (Map.Entry<String, String> entry : config.taskConfig.tags.entrySet()) {
newTags.add(new Tag(
evaluate(record, "tags", variables, entry.getKey()),
evaluate(record, "tags", variables, entry.getValue())
));
}
// Apply all tags at once
config.s3Config.getS3Client().setObjectTagging(new SetObjectTaggingRequest(
bucket,
objectPath,
new ObjectTagging(newTags)
));
Events.FILE_CHANGED.create(getContext())
.with("object_key", objectPath)
.createAndSend();
}
}
@Test
public void testUnknownHeader() throws Exception {
String blobName = "test-unknown-header";
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(BYTE_SOURCE.size());
PutObjectRequest request = new PutObjectRequest(
containerName, blobName, BYTE_SOURCE.openStream(), metadata)
.withTagging(new ObjectTagging(ImmutableList.<Tag>of()));
try {
client.putObject(request);
Fail.failBecauseExceptionWasNotThrown(AmazonS3Exception.class);
} catch (AmazonS3Exception e) {
assertThat(e.getErrorCode()).isEqualTo("NotImplemented");
}
}
@Override
public void addToListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) {
// Create the attributes
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
attributes.put("s3.bucket", versionSummary.getBucketName());
if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
attributes.put("s3.owner", versionSummary.getOwner().getId());
}
attributes.put("s3.etag", versionSummary.getETag());
attributes.put("s3.lastModified", String.valueOf(versionSummary.getLastModified().getTime()));
attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
attributes.put("s3.storeClass", versionSummary.getStorageClass());
attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
if (versionSummary.getVersionId() != null) {
attributes.put("s3.version", versionSummary.getVersionId());
}
if (taggingResult != null) {
final List<Tag> tags = taggingResult.getTagSet();
for (final Tag tag : tags) {
attributes.put("s3.tag." + tag.getKey(), tag.getValue());
}
}
if (objectMetadata != null) {
for (Map.Entry<String, String> e : objectMetadata.getUserMetadata().entrySet()) {
attributes.put("s3.user.metadata." + e.getKey(), e.getValue());
}
}
// Create the flowfile
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
}
private FlowFile setTagAttributes(ProcessSession session, FlowFile flowFile, List<Tag> tags) {
flowFile = session.removeAllAttributes(flowFile, Pattern.compile("^s3\\.tag\\..*"));
final Map<String, String> tagAttrs = new HashMap<>();
tags.stream().forEach(t -> tagAttrs.put("s3.tag." + t.getKey(), t.getValue()));
flowFile = session.putAllAttributes(flowFile, tagAttrs);
return flowFile;
}
@Test
public void testObjectTags() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "true");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "tag-test.txt");
attrs.put("tagS3PII", "true");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
GetObjectTaggingResult result = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, "tag-test.txt"));
List<Tag> objectTags = result.getTagSet();
for (Tag tag : objectTags) {
System.out.println("Tag Key : " + tag.getKey() + ", Tag Value : " + tag.getValue());
}
Assert.assertTrue(objectTags.size() == 1);
Assert.assertEquals("PII", objectTags.get(0).getKey());
Assert.assertEquals("true", objectTags.get(0).getValue());
}
@Test
public void testObjectTagsWritten() {
List<Tag> objectTags = new ArrayList<>();
objectTags.add(new Tag("dummytag1", "dummyvalue1"));
objectTags.add(new Tag("dummytag2", "dummyvalue2"));
putFileWithObjectTag("b/fileWithTag", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), objectTags);
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(ListS3.PREFIX, "b/");
runner.setProperty(ListS3.REGION, REGION);
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true");
runner.run();
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
MockFlowFile flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
flowFiles.assertAttributeEquals("filename", "b/fileWithTag");
flowFiles.assertAttributeExists("s3.tag.dummytag1");
flowFiles.assertAttributeExists("s3.tag.dummytag2");
flowFiles.assertAttributeEquals("s3.tag.dummytag1", "dummyvalue1");
flowFiles.assertAttributeEquals("s3.tag.dummytag2", "dummyvalue2");
}
@Test
public void testSimpleTag() throws Exception {
String objectKey = "test-file";
String tagKey = "nifi-key";
String tagValue = "nifi-val";
// put file in s3
putTestFile(objectKey, getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
// Set up processor
final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(TagS3Object.REGION, REGION);
runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", objectKey);
runner.enqueue(new byte[0], attrs);
// tag file
runner.run(1);
// Verify processor succeeds
runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
// Verify tag exists on S3 object
GetObjectTaggingResult res = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, objectKey));
assertTrue("Expected tag not found on S3 object", res.getTagSet().contains(new Tag(tagKey, tagValue)));
}
@Test
public void testAppendTag() throws Exception {
String objectKey = "test-file";
String tagKey = "nifi-key";
String tagValue = "nifi-val";
Tag existingTag = new Tag("oldkey", "oldvalue");
// put file in s3
putFileWithObjectTag(objectKey, getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), Arrays.asList(existingTag));
// Set up processor
final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(TagS3Object.REGION, REGION);
runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", objectKey);
runner.enqueue(new byte[0], attrs);
// tag file
runner.run(1);
// Verify processor succeeds
runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
// Verify new tag and existing exist on S3 object
GetObjectTaggingResult res = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, objectKey));
assertTrue("Expected new tag not found on S3 object", res.getTagSet().contains(new Tag(tagKey, tagValue)));
assertTrue("Expected existing tag not found on S3 object", res.getTagSet().contains(existingTag));
}