下面列出了com.amazonaws.services.s3.model.ObjectMetadata#setUserMetadata ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void putObject(final String key, final byte[] content, Map<String, String> userMetadata) {
final ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(content.length);
metadata.setContentMD5(md5b64(content));
metadata.setUserMetadata(userMetadata);
final PutObjectRequest request = new PutObjectRequest(bucket, key, new
ByteArrayInputStream(content), metadata);
S3.putObject(request);
}
protected void putFileWithUserMetadata(String key, File file, Map<String, String> userMetadata) throws AmazonS3Exception, FileNotFoundException {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setUserMetadata(userMetadata);
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, new FileInputStream(file), objectMetadata);
client.putObject(putRequest);
}
@Override
public String run() throws Exception {
final String fromBucket = this.step.getFromBucket();
final String toBucket = this.step.getToBucket();
final String fromPath = this.step.getFromPath();
final String toPath = this.step.getToPath();
final String kmsId = this.step.getKmsId();
final Map<String, String> metadatas = new HashMap<>();
final CannedAccessControlList acl = this.step.getAcl();
final String cacheControl = this.step.getCacheControl();
final String contentType = this.step.getContentType();
final String sseAlgorithm = this.step.getSseAlgorithm();
final S3ClientOptions s3ClientOptions = this.step.createS3ClientOptions();
final EnvVars envVars = this.getContext().get(EnvVars.class);
if (this.step.getMetadatas() != null && this.step.getMetadatas().length != 0) {
for (String metadata : this.step.getMetadatas()) {
if (metadata.split(":").length == 2) {
metadatas.put(metadata.split(":")[0], metadata.split(":")[1]);
}
}
}
Preconditions.checkArgument(fromBucket != null && !fromBucket.isEmpty(), "From bucket must not be null or empty");
Preconditions.checkArgument(fromPath != null && !fromPath.isEmpty(), "From path must not be null or empty");
Preconditions.checkArgument(toBucket != null && !toBucket.isEmpty(), "To bucket must not be null or empty");
Preconditions.checkArgument(toPath != null && !toPath.isEmpty(), "To path must not be null or empty");
TaskListener listener = Execution.this.getContext().get(TaskListener.class);
listener.getLogger().format("Copying s3://%s/%s to s3://%s/%s%n", fromBucket, fromPath, toBucket, toPath);
CopyObjectRequest request = new CopyObjectRequest(fromBucket, fromPath, toBucket, toPath);
// Add metadata
if (metadatas.size() > 0 || (cacheControl != null && !cacheControl.isEmpty()) || (contentType != null && !contentType.isEmpty()) || (sseAlgorithm != null && !sseAlgorithm.isEmpty())) {
ObjectMetadata metas = new ObjectMetadata();
if (metadatas.size() > 0) {
metas.setUserMetadata(metadatas);
}
if (cacheControl != null && !cacheControl.isEmpty()) {
metas.setCacheControl(cacheControl);
}
if (contentType != null && !contentType.isEmpty()) {
metas.setContentType(contentType);
}
if (sseAlgorithm != null && !sseAlgorithm.isEmpty()) {
metas.setSSEAlgorithm(sseAlgorithm);
}
request.withNewObjectMetadata(metas);
}
// Add acl
if (acl != null) {
request.withCannedAccessControlList(acl);
}
// Add kms
if (kmsId != null && !kmsId.isEmpty()) {
listener.getLogger().format("Using KMS: %s%n", kmsId);
request.withSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(kmsId));
}
TransferManager mgr = TransferManagerBuilder.standard()
.withS3Client(AWSClientFactory.create(s3ClientOptions.createAmazonS3ClientBuilder(), envVars))
.build();
try {
final Copy copy = mgr.copy(request);
copy.addProgressListener((ProgressListener) progressEvent -> {
if (progressEvent.getEventType() == ProgressEventType.TRANSFER_COMPLETED_EVENT) {
listener.getLogger().println("Finished: " + copy.getDescription());
}
});
copy.waitForCompletion();
}
finally{
mgr.shutdownNow();
}
listener.getLogger().println("Copy complete");
return String.format("s3://%s/%s", toBucket, toPath);
}
/**
* Upload the current block as a single PUT request; if the buffer is empty a
* 0-byte PUT will be invoked, as it is needed to create an entry at the far
* end.
*
* @throws IOException any problem
*/
private void putObject() throws IOException {
LOG.debug("Executing regular upload for {}", writeOperationHelper);
final COSDataBlocks.DataBlock block = getActiveBlock();
int size = block.dataSize();
final COSDataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest = uploadData.hasFile()
? writeOperationHelper.newPutRequest(uploadData.getFile())
: writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size);
final ObjectMetadata om = new ObjectMetadata();
om.setUserMetadata(mMetadata);
if (contentType != null && !contentType.isEmpty()) {
om.setContentType(contentType);
} else {
om.setContentType("application/octet-stream");
}
// if atomic write is enabled use the etag to ensure put request is atomic
if (mAtomicWriteEnabled) {
if (mEtag != null) {
LOG.debug("Atomic write - setting If-Match header");
om.setHeader("If-Match", mEtag);
} else {
LOG.debug("Atomic write - setting If-None-Match header");
om.setHeader("If-None-Match", "*");
}
}
putObjectRequest.setMetadata(om);
ListenableFuture<PutObjectResult> putObjectResult =
executorService.submit(new Callable<PutObjectResult>() {
@Override
public PutObjectResult call() throws Exception {
PutObjectResult result;
try {
// the putObject call automatically closes the input
// stream afterwards.
result = writeOperationHelper.putObject(putObjectRequest);
} finally {
closeAll(LOG, uploadData, block);
}
return result;
}
});
clearActiveBlock();
// wait for completion
try {
putObjectResult.get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted object upload", ie);
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
throw extractException("regular upload", key, ee);
}
}
@Test
public void testDefaults()
throws Exception
{
Config config = newConfig();
config.set("_command", BUCKET + "/" + KEY);
when(taskRequest.getConfig()).thenReturn(config);
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentType(CONTENT_TYPE);
objectMetadata.setContentLength(CONTENT_LENGTH);
objectMetadata.setUserMetadata(USER_METADATA);
Operator operator = factory.newOperator(newContext(projectPath, taskRequest));
when(s3Client.getObjectMetadata(objectMetadataRequestCaptor.capture())).thenReturn(objectMetadata);
TaskResult taskResult = operator.run();
verify(s3ClientFactory).create(credentialsCaptor.capture(), clientConfigurationCaptor.capture());
ClientConfiguration clientConfiguration = clientConfigurationCaptor.getValue();
assertThat(clientConfiguration.getProxyHost(), is(nullValue()));
assertThat(clientConfiguration.getProxyPort(), is(-1));
assertThat(clientConfiguration.getProxyUsername(), is(nullValue()));
assertThat(clientConfiguration.getProxyPassword(), is(nullValue()));
verify(s3Client).setS3ClientOptions(s3ClientOptionsCaptor.capture());
S3ClientOptions s3ClientOptions = s3ClientOptionsCaptor.getValue();
assertThat(s3ClientOptions.isPathStyleAccess(), is(false));
AWSCredentials credentials = credentialsCaptor.getValue();
assertThat(credentials.getAWSAccessKeyId(), is(ACCESS_KEY_ID));
assertThat(credentials.getAWSSecretKey(), is(SECRET_ACCESS_KEY));
GetObjectMetadataRequest objectMetadataRequest = objectMetadataRequestCaptor.getValue();
assertThat(objectMetadataRequest.getKey(), is(KEY));
assertThat(objectMetadataRequest.getBucketName(), is(BUCKET));
assertThat(objectMetadataRequest.getSSECustomerKey(), is(nullValue()));
Config expectedStoreParams = newConfig();
expectedStoreParams
.getNestedOrSetEmpty("s3")
.getNestedOrSetEmpty("last_object")
.set("metadata", objectMetadata.getRawMetadata())
.set("user_metadata", objectMetadata.getUserMetadata());
assertThat(taskResult.getStoreParams(), is(expectedStoreParams));
}
@Test
public void testExponentialBackoff()
throws Exception
{
Config config = newConfig();
config.set("_command", BUCKET + "/" + KEY);
when(taskRequest.getConfig()).thenReturn(config);
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentType(CONTENT_TYPE);
objectMetadata.setContentLength(CONTENT_LENGTH);
objectMetadata.setUserMetadata(USER_METADATA);
when(s3Client.getObjectMetadata(any(GetObjectMetadataRequest.class))).thenThrow(NOT_FOUND_EXCEPTION);
Operator operator = factory.newOperator(newContext(projectPath, taskRequest));
List<Integer> retryIntervals = new ArrayList<>();
for (int i = 0; i < 10; i++) {
try {
operator.run();
fail();
}
catch (TaskExecutionException e) {
assertThat(e.isError(), is(false));
assertThat(e.getRetryInterval().isPresent(), is(true));
retryIntervals.add(e.getRetryInterval().get());
Config lastStateParams = e.getStateParams(configFactory).get();
when(taskRequest.getLastStateParams()).thenReturn(lastStateParams);
}
}
for (int i = 1; i < retryIntervals.size(); i++) {
int prevInterval = retryIntervals.get(i - 1);
int interval = retryIntervals.get(i);
assertThat(interval, is(Math.min(MAX_POLL_INTERVAL, prevInterval * 2)));
}
assertThat(retryIntervals.get(retryIntervals.size() - 1), is(MAX_POLL_INTERVAL));
}
private void writeData( AmazonKey amazonKey, String bucket, String key, Map<String, String> metadata, StorageClass storage, String mimetype, cfData data, int retry, int retryseconds, String acl, String aes256key, Map<String, String> customheaders ) throws Exception {
if ( mimetype == null ) {
if ( data.getDataType() == cfData.CFBINARYDATA )
mimetype = "application/unknown";
else if ( cfData.isSimpleValue( data ) )
mimetype = "text/plain";
else
mimetype = "application/json";
// Check to see if the mime type is in the metadata
if ( metadata != null && metadata.containsKey( "Content-Type" ) )
mimetype = metadata.get( "Content-Type" );
}
InputStream ios = null;
long size = 0;
if ( data.getDataType() == cfData.CFSTRINGDATA ) {
ios = new java.io.ByteArrayInputStream( data.getString().getBytes() );
size = data.getString().length();
} else if ( data.getDataType() == cfData.CFBINARYDATA ) {
ios = new java.io.ByteArrayInputStream( ( (cfBinaryData) data ).getByteArray() );
size = ( (cfBinaryData) data ).getLength();
} else {
serializejson json = new serializejson();
StringBuilder out = new StringBuilder();
json.encodeJSON( out, data, false, CaseType.MAINTAIN, DateType.LONG );
size = out.length();
mimetype = "application/json";
ios = new java.io.ByteArrayInputStream( out.toString().getBytes() );
}
// Setup the object data
ObjectMetadata omd = new ObjectMetadata();
if ( metadata != null )
omd.setUserMetadata( metadata );
omd.setContentType( mimetype );
omd.setContentLength( size );
AmazonS3 s3Client = getAmazonS3( amazonKey );
// Let us run around the number of attempts
int attempts = 0;
while ( attempts < retry ) {
try {
PutObjectRequest por = new PutObjectRequest( bucket, key, ios, omd );
por.setStorageClass( storage );
if ( aes256key != null && !aes256key.isEmpty() )
por.setSSECustomerKey( new SSECustomerKey( aes256key ) );
if ( acl != null && !acl.isEmpty() )
por.setCannedAcl( amazonKey.getAmazonCannedAcl( acl ) );
if ( customheaders != null && !customheaders.isEmpty() ) {
Iterator<String> it = customheaders.keySet().iterator();
while ( it.hasNext() ) {
String k = it.next();
por.putCustomRequestHeader( k, customheaders.get( k ) );
}
}
s3Client.putObject( por );
break;
} catch ( Exception e ) {
cfEngine.log( "Failed: AmazonS3Write(bucket=" + bucket + "; key=" + key + "; attempt=" + ( attempts + 1 ) + "; exception=" + e.getMessage() + ")" );
attempts++;
if ( attempts == retry )
throw e;
else
Thread.sleep( retryseconds * 1000 );
}
}
}
private void writeFile( AmazonKey amazonKey, String bucket, String key, Map<String, String> metadata, StorageClass storage, String localpath, int retry, int retryseconds, boolean deletefile, boolean background, String callback, String callbackdata, String appname, String acl, String aes256key, Map<String, String> customheaders ) throws Exception {
File localFile = new File( localpath );
if ( !localFile.isFile() )
throw new Exception( "The file specified does not exist: " + localpath );
// Push this to the background loader to handle and return immediately
if ( background ) {
BackgroundUploader.acceptFile( amazonKey, bucket, key, metadata, storage, localpath, retry, retryseconds, deletefile, callback, callbackdata, appname, acl, aes256key, customheaders );
return;
}
// Setup the object data
ObjectMetadata omd = new ObjectMetadata();
if ( metadata != null )
omd.setUserMetadata( metadata );
AmazonS3 s3Client = getAmazonS3( amazonKey );
// Let us run around the number of attempts
int attempts = 0;
while ( attempts < retry ) {
try {
PutObjectRequest por = new PutObjectRequest( bucket, key, localFile );
por.setMetadata( omd );
por.setStorageClass( storage );
if ( acl != null && !acl.isEmpty() )
por.setCannedAcl( amazonKey.getAmazonCannedAcl( acl ) );
if ( aes256key != null && !aes256key.isEmpty() )
por.setSSECustomerKey( new SSECustomerKey( aes256key ) );
if ( customheaders != null && !customheaders.isEmpty() ) {
Iterator<String> it = customheaders.keySet().iterator();
while ( it.hasNext() ) {
String k = it.next();
por.putCustomRequestHeader( k, customheaders.get( k ) );
}
}
s3Client.putObject( por );
break;
} catch ( Exception e ) {
cfEngine.log( "Failed: AmazonS3Write(bucket=" + bucket + "key=" + key + "; file=" + localFile + "; attempt=" + ( attempts + 1 ) + "; exception=" + e.getMessage() + ")" );
attempts++;
if ( attempts == retry )
throw e;
else
Thread.sleep( retryseconds * 1000 );
}
}
// delete the file now that it is a success
if ( deletefile )
localFile.delete();
}
@Test
public void testSinglepartUpload() throws Exception {
String blobName = "singlepart-upload";
String cacheControl = "max-age=3600";
String contentDisposition = "attachment; filename=new.jpg";
String contentEncoding = "gzip";
String contentLanguage = "fr";
String contentType = "audio/mp4";
Map<String, String> userMetadata = ImmutableMap.of(
"key1", "value1",
"key2", "value2");
ObjectMetadata metadata = new ObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
metadata.setCacheControl(cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
metadata.setContentDisposition(contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
metadata.setContentEncoding(contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
metadata.setContentLanguage(contentLanguage);
}
metadata.setContentLength(BYTE_SOURCE.size());
metadata.setContentType(contentType);
// TODO: expires
metadata.setUserMetadata(userMetadata);
client.putObject(containerName, blobName, BYTE_SOURCE.openStream(),
metadata);
S3Object object = client.getObject(containerName, blobName);
try (InputStream actual = object.getObjectContent();
InputStream expected = BYTE_SOURCE.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
ObjectMetadata newContentMetadata = object.getObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
assertThat(newContentMetadata.getCacheControl()).isEqualTo(
cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentDisposition()).isEqualTo(
contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentEncoding()).isEqualTo(
contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentLanguage()).isEqualTo(
contentLanguage);
}
assertThat(newContentMetadata.getContentType()).isEqualTo(
contentType);
// TODO: expires
assertThat(newContentMetadata.getUserMetadata()).isEqualTo(
userMetadata);
}
@Test
public void testMultipartUpload() throws Exception {
String blobName = "multipart-upload";
String cacheControl = "max-age=3600";
String contentDisposition = "attachment; filename=new.jpg";
String contentEncoding = "gzip";
String contentLanguage = "fr";
String contentType = "audio/mp4";
Map<String, String> userMetadata = ImmutableMap.of(
"key1", "value1",
"key2", "value2");
ObjectMetadata metadata = new ObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
metadata.setCacheControl(cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
metadata.setContentDisposition(contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
metadata.setContentEncoding(contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
metadata.setContentLanguage(contentLanguage);
}
metadata.setContentType(contentType);
// TODO: expires
metadata.setUserMetadata(userMetadata);
InitiateMultipartUploadResult result = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(containerName, blobName,
metadata));
ByteSource byteSource = TestUtils.randomByteSource().slice(
0, MINIMUM_MULTIPART_SIZE + 1);
ByteSource byteSource1 = byteSource.slice(0, MINIMUM_MULTIPART_SIZE);
ByteSource byteSource2 = byteSource.slice(MINIMUM_MULTIPART_SIZE, 1);
UploadPartResult part1 = client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(1)
.withPartSize(byteSource1.size())
.withInputStream(byteSource1.openStream()));
UploadPartResult part2 = client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(2)
.withPartSize(byteSource2.size())
.withInputStream(byteSource2.openStream()));
client.completeMultipartUpload(new CompleteMultipartUploadRequest(
containerName, blobName, result.getUploadId(),
ImmutableList.of(part1.getPartETag(), part2.getPartETag())));
ObjectListing listing = client.listObjects(containerName);
assertThat(listing.getObjectSummaries()).hasSize(1);
S3Object object = client.getObject(containerName, blobName);
try (InputStream actual = object.getObjectContent();
InputStream expected = byteSource.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
ObjectMetadata newContentMetadata = object.getObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
assertThat(newContentMetadata.getCacheControl()).isEqualTo(
cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentDisposition()).isEqualTo(
contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentEncoding()).isEqualTo(
contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
assertThat(newContentMetadata.getContentLanguage()).isEqualTo(
contentLanguage);
}
assertThat(newContentMetadata.getContentType()).isEqualTo(
contentType);
// TODO: expires
assertThat(newContentMetadata.getUserMetadata()).isEqualTo(
userMetadata);
}
@Test
public void testCopyObjectPreserveMetadata() throws Exception {
String fromName = "from-name";
String toName = "to-name";
String cacheControl = "max-age=3600";
String contentDisposition = "attachment; filename=old.jpg";
String contentEncoding = "gzip";
String contentLanguage = "en";
String contentType = "audio/ogg";
Map<String, String> userMetadata = ImmutableMap.of(
"key1", "value1",
"key2", "value2");
ObjectMetadata metadata = new ObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
metadata.setCacheControl(cacheControl);
}
metadata.setContentLength(BYTE_SOURCE.size());
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
metadata.setContentDisposition(contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
metadata.setContentEncoding(contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
metadata.setContentLanguage(contentLanguage);
}
metadata.setContentType(contentType);
// TODO: expires
metadata.setUserMetadata(userMetadata);
client.putObject(containerName, fromName, BYTE_SOURCE.openStream(),
metadata);
client.copyObject(containerName, fromName, containerName, toName);
S3Object object = client.getObject(containerName, toName);
try (InputStream actual = object.getObjectContent();
InputStream expected = BYTE_SOURCE.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
ObjectMetadata contentMetadata = object.getObjectMetadata();
assertThat(contentMetadata.getContentLength()).isEqualTo(
BYTE_SOURCE.size());
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
assertThat(contentMetadata.getCacheControl()).isEqualTo(
cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
assertThat(contentMetadata.getContentDisposition()).isEqualTo(
contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
assertThat(contentMetadata.getContentEncoding()).isEqualTo(
contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
assertThat(contentMetadata.getContentLanguage()).isEqualTo(
contentLanguage);
}
assertThat(contentMetadata.getContentType()).isEqualTo(
contentType);
// TODO: expires
assertThat(contentMetadata.getUserMetadata()).isEqualTo(
userMetadata);
}
@Test
public void testCopyObjectReplaceMetadata() throws Exception {
String fromName = "from-name";
String toName = "to-name";
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(BYTE_SOURCE.size());
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
metadata.setCacheControl("max-age=3600");
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
metadata.setContentDisposition("attachment; filename=old.jpg");
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
metadata.setContentEncoding("compress");
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
metadata.setContentLanguage("en");
}
metadata.setContentType("audio/ogg");
// TODO: expires
metadata.setUserMetadata(ImmutableMap.of(
"key1", "value1",
"key2", "value2"));
client.putObject(containerName, fromName, BYTE_SOURCE.openStream(),
metadata);
String cacheControl = "max-age=1800";
String contentDisposition = "attachment; filename=new.jpg";
String contentEncoding = "gzip";
String contentLanguage = "fr";
String contentType = "audio/mp4";
ObjectMetadata contentMetadata = new ObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
contentMetadata.setCacheControl(cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
contentMetadata.setContentDisposition(contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
contentMetadata.setContentEncoding(contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
contentMetadata.setContentLanguage(contentLanguage);
}
contentMetadata.setContentType(contentType);
// TODO: expires
Map<String, String> userMetadata = ImmutableMap.of(
"key3", "value3",
"key4", "value4");
contentMetadata.setUserMetadata(userMetadata);
client.copyObject(new CopyObjectRequest(
containerName, fromName, containerName, toName)
.withNewObjectMetadata(contentMetadata));
S3Object object = client.getObject(containerName, toName);
try (InputStream actual = object.getObjectContent();
InputStream expected = BYTE_SOURCE.openStream()) {
assertThat(actual).hasContentEqualTo(expected);
}
ObjectMetadata toContentMetadata = object.getObjectMetadata();
if (!Quirks.NO_CACHE_CONTROL_SUPPORT.contains(blobStoreType)) {
assertThat(contentMetadata.getCacheControl()).isEqualTo(
cacheControl);
}
if (!Quirks.NO_CONTENT_DISPOSITION.contains(blobStoreType)) {
assertThat(toContentMetadata.getContentDisposition()).isEqualTo(
contentDisposition);
}
if (!Quirks.NO_CONTENT_ENCODING.contains(blobStoreType)) {
assertThat(toContentMetadata.getContentEncoding()).isEqualTo(
contentEncoding);
}
if (!Quirks.NO_CONTENT_LANGUAGE.contains(blobStoreType)) {
assertThat(toContentMetadata.getContentLanguage()).isEqualTo(
contentLanguage);
}
assertThat(toContentMetadata.getContentType()).isEqualTo(
contentType);
// TODO: expires
assertThat(toContentMetadata.getUserMetadata()).isEqualTo(
userMetadata);
}
@Test
public void testGetObject() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);
S3Object s3ObjectResponse = new S3Object();
s3ObjectResponse.setBucketName("response-bucket-name");
s3ObjectResponse.setKey("response-key");
s3ObjectResponse.setObjectContent(new StringInputStream("Some Content"));
ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
metadata.setContentDisposition("key/path/to/file.txt");
metadata.setContentType("text/plain");
metadata.setContentMD5("testMD5hash");
Date expiration = new Date();
metadata.setExpirationTime(expiration);
metadata.setExpirationTimeRuleId("testExpirationRuleId");
Map<String, String> userMetadata = new HashMap<>();
userMetadata.put("userKey1", "userValue1");
userMetadata.put("userKey2", "userValue2");
metadata.setUserMetadata(userMetadata);
metadata.setSSEAlgorithm("testAlgorithm");
Mockito.when(metadata.getETag()).thenReturn("test-etag");
s3ObjectResponse.setObjectMetadata(metadata);
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
runner.run(1);
ArgumentCaptor<GetObjectRequest> captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture());
GetObjectRequest request = captureRequest.getValue();
assertEquals("request-bucket", request.getBucketName());
assertEquals("request-key", request.getKey());
assertFalse(request.isRequesterPays());
assertNull(request.getVersionId());
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
MockFlowFile ff = ffs.get(0);
ff.assertAttributeEquals("s3.bucket", "response-bucket-name");
ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt");
ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to");
ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt");
ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
ff.assertAttributeEquals("hash.value", "testMD5hash");
ff.assertAttributeEquals("hash.algorithm", "MD5");
ff.assertAttributeEquals("s3.etag", "test-etag");
ff.assertAttributeEquals("s3.expirationTime", String.valueOf(expiration.getTime()));
ff.assertAttributeEquals("s3.expirationTimeRuleId", "testExpirationRuleId");
ff.assertAttributeEquals("userKey1", "userValue1");
ff.assertAttributeEquals("userKey2", "userValue2");
ff.assertAttributeEquals("s3.sseAlgorithm", "testAlgorithm");
ff.assertContentEquals("Some Content");
}
@Test
public void testGetObjectWithRequesterPays() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);
S3Object s3ObjectResponse = new S3Object();
s3ObjectResponse.setBucketName("response-bucket-name");
s3ObjectResponse.setKey("response-key");
s3ObjectResponse.setObjectContent(new StringInputStream("Some Content"));
ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
metadata.setContentDisposition("key/path/to/file.txt");
metadata.setContentType("text/plain");
metadata.setContentMD5("testMD5hash");
Date expiration = new Date();
metadata.setExpirationTime(expiration);
metadata.setExpirationTimeRuleId("testExpirationRuleId");
Map<String, String> userMetadata = new HashMap<>();
userMetadata.put("userKey1", "userValue1");
userMetadata.put("userKey2", "userValue2");
metadata.setUserMetadata(userMetadata);
metadata.setSSEAlgorithm("testAlgorithm");
Mockito.when(metadata.getETag()).thenReturn("test-etag");
s3ObjectResponse.setObjectMetadata(metadata);
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
runner.run(1);
ArgumentCaptor<GetObjectRequest> captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture());
GetObjectRequest request = captureRequest.getValue();
assertEquals("request-bucket", request.getBucketName());
assertEquals("request-key", request.getKey());
assertTrue(request.isRequesterPays());
assertNull(request.getVersionId());
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
MockFlowFile ff = ffs.get(0);
ff.assertAttributeEquals("s3.bucket", "response-bucket-name");
ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt");
ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to");
ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt");
ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
ff.assertAttributeEquals("hash.value", "testMD5hash");
ff.assertAttributeEquals("hash.algorithm", "MD5");
ff.assertAttributeEquals("s3.etag", "test-etag");
ff.assertAttributeEquals("s3.expirationTime", String.valueOf(expiration.getTime()));
ff.assertAttributeEquals("s3.expirationTimeRuleId", "testExpirationRuleId");
ff.assertAttributeEquals("userKey1", "userValue1");
ff.assertAttributeEquals("userKey2", "userValue2");
ff.assertAttributeEquals("s3.sseAlgorithm", "testAlgorithm");
ff.assertContentEquals("Some Content");
}