下面列出了怎么用com.amazonaws.services.s3.model.CopyObjectRequest的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void doRename( FileObject newFile ) throws Exception {
// no folder renames on S3
if ( getType().equals( FileType.FOLDER ) ) {
throw new FileSystemException( "vfs.provider/rename-not-supported.error" );
}
s3ObjectMetadata = fileSystem.getS3Client().getObjectMetadata( bucketName, key );
if ( s3ObjectMetadata == null ) {
// object doesn't exist
throw new FileSystemException( "vfs.provider/rename.error", this, newFile );
}
S3CommonFileObject dest = (S3CommonFileObject) newFile;
// 1. copy the file
CopyObjectRequest copyObjRequest = createCopyObjectRequest( bucketName, key, dest.bucketName, dest.key );
fileSystem.getS3Client().copyObject( copyObjRequest );
// 2. delete self
delete();
}
@Test
public void testDoRename() throws Exception {
String someNewBucketName = "someNewBucketName";
String someNewKey = "some/newKey";
S3FileName newFileName = new S3FileName( SCHEME, someNewBucketName, someNewBucketName + "/" + someNewKey, FileType.FILE );
S3FileObject newFile = new S3FileObject( newFileName, fileSystemSpy );
ArgumentCaptor<CopyObjectRequest> copyObjectRequestArgumentCaptor = ArgumentCaptor.forClass( CopyObjectRequest.class );
when( s3ServiceMock.doesBucketExistV2( someNewBucketName ) ).thenReturn( true );
s3FileObjectFileSpy.doAttach();
s3FileObjectFileSpy.moveTo( newFile );
verify( s3ServiceMock ).copyObject( copyObjectRequestArgumentCaptor.capture() );
assertEquals( someNewBucketName, copyObjectRequestArgumentCaptor.getValue().getDestinationBucketName() );
assertEquals( someNewKey, copyObjectRequestArgumentCaptor.getValue().getDestinationKey() );
assertEquals( BUCKET_NAME, copyObjectRequestArgumentCaptor.getValue().getSourceBucketName() );
assertEquals( origKey, copyObjectRequestArgumentCaptor.getValue().getSourceKey() );
}
@Test
public void testDoRename() throws Exception {
String someNewBucketName = "someNewBucketName";
String someNewKey = "some/newKey";
S3NFileName newFileName = new S3NFileName( SCHEME, someNewBucketName, "/" + someNewBucketName + "/" + someNewKey, FileType.FILE );
S3NFileObject newFile = new S3NFileObject( newFileName, fileSystemSpy );
ArgumentCaptor<CopyObjectRequest> copyObjectRequestArgumentCaptor = ArgumentCaptor.forClass( CopyObjectRequest.class );
when( s3ServiceMock.doesBucketExistV2( someNewBucketName ) ).thenReturn( true );
s3FileObjectFileSpy.moveTo( newFile );
verify( s3ServiceMock ).copyObject( copyObjectRequestArgumentCaptor.capture() );
assertEquals( someNewBucketName, copyObjectRequestArgumentCaptor.getValue().getDestinationBucketName() );
assertEquals( someNewKey, copyObjectRequestArgumentCaptor.getValue().getDestinationKey() );
assertEquals( BUCKET_NAME, copyObjectRequestArgumentCaptor.getValue().getSourceBucketName() );
assertEquals( origKey, copyObjectRequestArgumentCaptor.getValue().getSourceKey() );
}
@Override
public FreshenResult freshenRemoteObject(final RemoteObjectReference object) throws InterruptedException {
final String canonicalPath = ((S3RemoteObjectReference) object).canonicalPath;
final CopyObjectRequest copyRequest = new CopyObjectRequest(request.storageLocation.bucket,
canonicalPath,
request.storageLocation.bucket,
canonicalPath).withStorageClass(StorageClass.Standard);
try {
// attempt to refresh existing object in the bucket via an inplace copy
transferManager.copy(copyRequest).waitForCompletion();
return FreshenResult.FRESHENED;
} catch (final AmazonServiceException e) {
// AWS S3 under certain access policies can't return NoSuchKey (404)
// instead, it returns AccessDenied (403) — handle it the same way
if (e.getStatusCode() != 404 && e.getStatusCode() != 403) {
throw e;
}
// the freshen failed because the file/key didn't exist
return FreshenResult.UPLOAD_REQUIRED;
}
}
@Test
public void copyCheckTransferManagerIsShutdown() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
s3s3Copier.copy();
verify(mockedTransferManager).shutdownNow();
}
@Test
public void copyCheckTransferManagerIsShutdownWhenSubmittingJobExceptionsAreThrown() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenThrow(new AmazonServiceException("MyCause"));
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
try {
s3s3Copier.copy();
fail("exception should have been thrown");
} catch (CircusTrainException e) {
verify(mockedTransferManager).shutdownNow();
assertThat(e.getCause().getMessage(), startsWith("MyCause"));
}
}
@Test
public void copyDefaultCopierOptions() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
s3s3Copier.copy();
ArgumentCaptor<CopyObjectRequest> argument = ArgumentCaptor.forClass(CopyObjectRequest.class);
verify(mockedTransferManager).copy(argument.capture(), any(AmazonS3.class), any(TransferStateChangeListener.class));
CopyObjectRequest copyObjectRequest = argument.getValue();
assertNull(copyObjectRequest.getNewObjectMetadata());
}
@VisibleForTesting
CopyObjectResult atomicCopy(
S3ResourceId sourcePath, S3ResourceId destinationPath, ObjectMetadata sourceObjectMetadata)
throws AmazonClientException {
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(
sourcePath.getBucket(),
sourcePath.getKey(),
destinationPath.getBucket(),
destinationPath.getKey());
copyObjectRequest.setNewObjectMetadata(sourceObjectMetadata);
copyObjectRequest.setStorageClass(options.getS3StorageClass());
copyObjectRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
copyObjectRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
return amazonS3.get().copyObject(copyObjectRequest);
}
/**
* Copies a file from one Amazon S3 bucket to another one.
*
* @param fileName
* This is the name of the file that is desired to be copied.
*
* @param destinationFileName
* The name of the destination file
*/
public String copyObject(String fileName, String destinationFileName) throws SenderException {
try {
bucketDoesNotExist(bucketName); //if bucket does not exists this method throws an exception
fileDoesNotExist(bucketName, fileName); //if object does not exists this method throws an exception
if (!s3Client.doesBucketExistV2(destinationBucketName))
bucketCreationWithObjectAction(destinationBucketName);
if (!s3Client.doesObjectExist(destinationBucketName, destinationFileName)) {
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, fileName, destinationBucketName, destinationFileName);
if (isStorageClassEnabled())
copyObjectRequest.setStorageClass(getStorageClass());
s3Client.copyObject(copyObjectRequest);
log.debug("Object with fileName [" + fileName + "] copied from bucket with bucketName [" + bucketName
+ "] into bucket with bucketName [" + destinationBucketName + "] and new fileName ["
+ destinationFileName + "]");
} else
throw new SenderException(" file with given name already exists, please specify a new name");
} catch (AmazonServiceException e) {
log.error("Failed to perform [copy] action on object with fileName [" + fileName + "]");
throw new SenderException("Failed to perform [copy] action on object with fileName [" + fileName + "]");
}
return destinationFileName;
}
private void initialiseCopyJobsFromListing(
AmazonS3URI sourceS3Uri,
final AmazonS3URI targetS3Uri,
ListObjectsRequest request,
ObjectListing listing) {
LOG
.debug("Found objects to copy {}, for request {}/{}", listing.getObjectSummaries(), request.getBucketName(),
request.getPrefix());
List<S3ObjectSummary> objectSummaries = listing.getObjectSummaries();
for (final S3ObjectSummary s3ObjectSummary : objectSummaries) {
totalBytesToReplicate += s3ObjectSummary.getSize();
String fileName = StringUtils.removeStart(s3ObjectSummary.getKey(), sourceS3Uri.getKey());
final String targetKey = Strings.nullToEmpty(targetS3Uri.getKey()) + fileName;
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(s3ObjectSummary.getBucketName(),
s3ObjectSummary.getKey(), targetS3Uri.getBucket(), targetKey);
if (s3s3CopierOptions.getCannedAcl() != null) {
copyObjectRequest.withCannedAccessControlList(s3s3CopierOptions.getCannedAcl());
}
applyObjectMetadata(copyObjectRequest);
TransferStateChangeListener stateChangeListener = new BytesTransferStateChangeListener(s3ObjectSummary,
targetS3Uri, targetKey);
copyJobRequests.add(new CopyJobRequest(copyObjectRequest, stateChangeListener));
}
}
private void applyObjectMetadata(CopyObjectRequest copyObjectRequest) {
if (s3s3CopierOptions.isS3ServerSideEncryption()) {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
copyObjectRequest.setNewObjectMetadata(objectMetadata);
}
}
private Copy submitCopyJob(CopyJobRequest copyJob) {
CopyObjectRequest copyObjectRequest = copyJob.getCopyObjectRequest();
LOG
.info("Copying object from '{}/{}' to '{}/{}'", copyObjectRequest.getSourceBucketName(),
copyObjectRequest.getSourceKey(), copyObjectRequest.getDestinationBucketName(),
copyObjectRequest.getDestinationKey());
return transferManager.copy(copyObjectRequest, srcClient, copyJob.getTransferStateChangeListener());
}
@Test
public void copyCheckTransferManagerIsShutdownWhenMaxRetriesExceeded() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(copy.getProgress()).thenReturn(new TransferProgress());
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
doThrow(new AmazonClientException("cause")).when(copy).waitForCompletion();
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
try {
s3s3Copier.copy();
fail("exception should have been thrown");
} catch (CircusTrainException e) {
verify(mockedTransferManager).shutdownNow();
verify(mockedTransferManager, Mockito.times(3))
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class));
assertThat(e.getMessage(), is("1 job(s) failed the maximum number of copy attempts, 3"));
}
}
@Test
public void copyServerSideEncryption() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
Map<String, Object> copierOptions = new HashMap<>();
copierOptions.put(S3S3CopierOptions.Keys.S3_SERVER_SIDE_ENCRYPTION.keyName(), "true");
S3S3CopierOptions customOptions = new S3S3CopierOptions(copierOptions);
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(customOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, customOptions);
s3s3Copier.copy();
ArgumentCaptor<CopyObjectRequest> argument = ArgumentCaptor.forClass(CopyObjectRequest.class);
verify(mockedTransferManager).copy(argument.capture(), any(AmazonS3.class), any(TransferStateChangeListener.class));
CopyObjectRequest copyObjectRequest = argument.getValue();
assertThat(copyObjectRequest.getNewObjectMetadata().getSSEAlgorithm(),
is(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION));
}
@Test
public void copyCannedAcl() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
Map<String, Object> copierOptions = new HashMap<>();
copierOptions
.put(S3S3CopierOptions.Keys.CANNED_ACL.keyName(), CannedAccessControlList.BucketOwnerFullControl.toString());
S3S3CopierOptions customOptions = new S3S3CopierOptions(copierOptions);
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(customOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, customOptions);
s3s3Copier.copy();
ArgumentCaptor<CopyObjectRequest> argument = ArgumentCaptor.forClass(CopyObjectRequest.class);
verify(mockedTransferManager).copy(argument.capture(), any(AmazonS3.class), any(TransferStateChangeListener.class));
CopyObjectRequest copyObjectRequest = argument.getValue();
assertThat(copyObjectRequest.getCannedAccessControlList(), is(CannedAccessControlList.BucketOwnerFullControl));
}
@Test
public void copySafelyShutDownTransferWhenRetryFails() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenThrow(new AmazonClientException("S3 error"));
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
try {
s3s3Copier.copy();
fail("Exception should have been thrown");
} catch (CircusTrainException e) {
verify(mockedTransferManager).shutdownNow();
assertThat(e.getMessage(), is("Error in S3S3Copier:"));
assertThat(e.getCause().getMessage(), startsWith("S3 error"));
}
}
@Override
public Parameters handleRequest(Parameters parameters, Context context) {
context.getLogger().log("Input Function [" + context.getFunctionName() + "], Parameters [" + parameters + "]");
// The archive location of the snapshot will be decided by the alert
// flag
String newFilename;
if (parameters.getSendAlert()) {
newFilename = parameters.getS3Key().replace("upload/", "archive/alerts/");
} else {
newFilename = parameters.getS3Key().replace("upload/", "archive/falsepositives/");
}
// Ensure that the first two hyphens are used to create sub-directories
// in the file path
newFilename = newFilename.replaceFirst("-", "/");
newFilename = newFilename.replaceFirst("-", "/");
// Using the S3 client, first copy the file to the archive, and then
// delete the original
AmazonS3 client = AmazonS3ClientBuilder.defaultClient();
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(parameters.getS3Bucket(), parameters.getS3Key(), parameters.getS3Bucket(), newFilename);
client.copyObject(copyObjectRequest);
DeleteObjectRequest deleteObjectRequest = new DeleteObjectRequest(parameters.getS3Bucket(), parameters.getS3Key());
client.deleteObject(deleteObjectRequest);
// Place the new location in the parameters
parameters.setS3ArchivedKey(newFilename);
context.getLogger().log("Output Function [" + context.getFunctionName() + "], Parameters [" + parameters + "]");
return parameters;
}
private void copyFile(String srcKey, String dstKey) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("copyFile " + srcKey + " -> " + dstKey);
}
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
final ObjectMetadata dstom = srcom.clone();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
ProgressListener progressListener = new ProgressListener() {
public void progressChanged(ProgressEvent progressEvent) {
switch (progressEvent.getEventCode()) {
case ProgressEvent.PART_COMPLETED_EVENT_CODE:
statistics.incrementWriteOps(1);
break;
default:
break;
}
}
};
Copy copy = transfers.copy(copyObjectRequest);
copy.addProgressListener(progressListener);
try {
copy.waitForCopyResult();
statistics.incrementWriteOps(1);
} catch (InterruptedException e) {
throw new IOException("Got interrupted, cancelling");
}
}
private void copyFile(String srcKey, String dstKey) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("copyFile " + srcKey + " -> " + dstKey);
}
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
final ObjectMetadata dstom = srcom.clone();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
ProgressListener progressListener = new ProgressListener() {
public void progressChanged(ProgressEvent progressEvent) {
switch (progressEvent.getEventCode()) {
case ProgressEvent.PART_COMPLETED_EVENT_CODE:
statistics.incrementWriteOps(1);
break;
default:
break;
}
}
};
Copy copy = transfers.copy(copyObjectRequest);
copy.addProgressListener(progressListener);
try {
copy.waitForCopyResult();
statistics.incrementWriteOps(1);
} catch (InterruptedException e) {
throw new IOException("Got interrupted, cancelling");
}
}
private void testAtomicCopy(S3Options options) {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(options);
S3ResourceId sourcePath = S3ResourceId.fromUri("s3://bucket/from");
S3ResourceId destinationPath = S3ResourceId.fromUri("s3://bucket/to");
CopyObjectResult copyObjectResult = new CopyObjectResult();
if (getSSECustomerKeyMd5(options) != null) {
copyObjectResult.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
}
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(
sourcePath.getBucket(),
sourcePath.getKey(),
destinationPath.getBucket(),
destinationPath.getKey());
copyObjectRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
copyObjectRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
when(s3FileSystem.getAmazonS3Client().copyObject(any(CopyObjectRequest.class)))
.thenReturn(copyObjectResult);
assertEquals(
getSSECustomerKeyMd5(options),
s3FileSystem.getAmazonS3Client().copyObject(copyObjectRequest).getSSECustomerKeyMd5());
ObjectMetadata sourceS3ObjectMetadata = new ObjectMetadata();
s3FileSystem.atomicCopy(sourcePath, destinationPath, sourceS3ObjectMetadata);
verify(s3FileSystem.getAmazonS3Client(), times(2)).copyObject(any(CopyObjectRequest.class));
}
@Override
public boolean move(String src, String dst, boolean archive) {
log.info("Move: {} to {}", src, dst);
// s3 has no move operation, so do a copy and delete
// this is not an atomic operation
Optional<S3Details> srcDetails = S3Details.from(src);
Optional<S3Details> dstDetails = S3Details.from(dst);
if (srcDetails.isPresent() && dstDetails.isPresent()) {
CopyObjectRequest cor = new CopyObjectRequest(srcDetails.get().getBucket(),
srcDetails.get().getKey(), dstDetails.get().getBucket(), dstDetails.get().getKey());
// make sure to set the storage class for file copy
if (archive) {
// set class for archive file
cor.setStorageClass(StorageClass.fromValue(StringUtils.upperCase(archiveStorageClass)));
} else {
// set class for parquet files
cor.setStorageClass(StorageClass.fromValue(StringUtils.upperCase(uploadStorageClass)));
}
try {
CopyObjectResult r = amazonS3.copyObject(cor);
if (Objects.nonNull(r.getETag())) {
// copy ok, delete src
amazonS3.deleteObject(srcDetails.get().getBucket(), srcDetails.get().getKey());
}
return true;
} catch (Exception e) {
log.error("Error during copying {} to ", src, dst, e);
}
}
return false;
}
static void copy(
AmazonS3 s3Client,
String srcBucket,
String sourceKey,
String destBucket,
String destKey,
boolean isMove
) {
CopyObjectRequest cp = new CopyObjectRequest(srcBucket, sourceKey, destBucket, destKey);
s3Client.copyObject(cp);
if (isMove) {
s3Client.deleteObject(new DeleteObjectRequest(srcBucket, sourceKey));
}
}
public cfData execute( cfSession _session, cfArgStructData argStruct ) throws cfmRunTimeException{
AmazonKey amazonKey = getAmazonKey(_session, argStruct);
AmazonS3 s3Client = getAmazonS3(amazonKey);
String bucket = getNamedStringParam(argStruct, "bucket", null );
String srckey = getNamedStringParam(argStruct, "srckey", null );
String deskey = getNamedStringParam(argStruct, "destkey", null );
String aes256key = getNamedStringParam(argStruct, "aes256key", null );
if ( srckey != null && srckey.charAt( 0 ) == '/' )
srckey = srckey.substring(1);
if ( deskey != null && deskey.charAt( 0 ) == '/' )
deskey = deskey.substring(1);
CopyObjectRequest cor = new CopyObjectRequest(bucket, srckey, bucket, deskey);
if ( aes256key != null && !aes256key.isEmpty() ){
cor.setSourceSSECustomerKey( new SSECustomerKey(aes256key) );
cor.setDestinationSSECustomerKey( new SSECustomerKey(aes256key) );
}
try {
s3Client.copyObject(cor);
s3Client.deleteObject(new DeleteObjectRequest(bucket, srckey));
return cfBooleanData.TRUE;
} catch (Exception e) {
throwException(_session, "AmazonS3: " + e.getMessage() );
return cfBooleanData.FALSE;
}
}
@Override
public void handlePayload(TOCPayload payload, WorkerState workerState) throws Exception {
TocInfo tocInfo = payload.tocInfo;
String logPrefix = "handlePayload() KeyCopy s3://" + this.sourceS3BucketName + "/" + tocInfo.path +
" => s3://" + this.targetS3BucketName +"/"+ tocInfo.path;
try {
CopyObjectRequest copyRequest = new CopyObjectRequest(this.sourceS3BucketName,
tocInfo.path,
this.targetS3BucketName,
tocInfo.path);
copyRequest.setStorageClass(storageClass);
// copyRequest.setGeneralProgressListener(this);
if (this.enableServerSideEncryption) {
copyRequest.putCustomRequestHeader("x-amz-server-side-encryption", "AES256");
}
CopyObjectResult copyResult = s3Client.copyObject(copyRequest);
logger.debug(logPrefix + " copied OK");
workerState.addTocPathWritten(new TocPathOpResult(payload.mode, true, tocInfo.path, "s3.copyKey", "OK"));
} catch(Exception e) {
logger.error(logPrefix + " unexpected ERROR: " + e.getMessage(),e);
workerState.addTocPathWriteFailure(
new TocPathOpResult(payload.mode, false, tocInfo.path, "s3.copyKey", logPrefix + " " + e.getMessage()));
}
}
protected CopyObjectRequest createCopyObjectRequest( String sourceBucket, String sourceKey, String destBucket, String destKey ) {
return new CopyObjectRequest( sourceBucket, sourceKey, destBucket, destKey );
}
@Override
protected CopyObjectRequest createCopyObjectRequest( String sourceBucket, String sourceKey, String destBucket, String destKey ) {
SimpleEntry<String, String> sourcePath = fixFilePath( sourceKey, sourceBucket );
SimpleEntry<String, String> destPath = fixFilePath( destKey, destBucket );
return new CopyObjectRequest( sourcePath.getValue(), sourcePath.getKey(), destPath.getValue(), destPath.getKey() );
}
public CopyJobRequest(CopyObjectRequest copyObjectRequest, TransferStateChangeListener transferStateChangeListener) {
this.copyObjectRequest = copyObjectRequest;
this.transferStateChangeListener = transferStateChangeListener;
}
public CopyObjectRequest getCopyObjectRequest() {
return copyObjectRequest;
}
@Test
public void copyRetryOnlyFailedCopyJobs() throws InterruptedException {
String sourceKey1 = "bar/data1";
String sourceKey2 = "bar/data2";
client.putObject("source", sourceKey1, inputData);
client.putObject("source", sourceKey2, inputData);
Path sourceBaseLocation = new Path("s3://source/bar/");
Path replicaLocation = new Path("s3://target/foo/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
transferProgress.setTotalBytesToTransfer(7);
when(copy.getProgress()).thenReturn(transferProgress);
doThrow(new AmazonClientException("cause")).doNothing().when(copy).waitForCompletion();
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
try {
Metrics metrics = s3s3Copier.copy();
ArgumentCaptor<CopyObjectRequest> captor = ArgumentCaptor.forClass(CopyObjectRequest.class);
verify(mockedTransferManager, Mockito.times(3))
.copy(captor.capture(), any(AmazonS3.class), any(TransferStateChangeListener.class));
List<CopyObjectRequest> capturedCopyRequests = captor.getAllValues();
assertThat(capturedCopyRequests.get(0).getSourceKey(), is(sourceKey1));
assertThat(capturedCopyRequests.get(1).getSourceKey(), is(sourceKey2));
assertThat(capturedCopyRequests.get(2).getSourceKey(), is(sourceKey1));
verify(mockedTransferManager).shutdownNow();
verifyNoMoreInteractions(mockedTransferManager);
assertThat(metrics.getBytesReplicated(), is(14L));
assertThat(metrics.getMetrics().get(S3S3CopierMetrics.Metrics.TOTAL_BYTES_TO_REPLICATE.name()), is(14L));
} catch (CircusTrainException e) {
fail("Exception should not have been thrown");
}
}
@Override
public String run() throws Exception {
final String fromBucket = this.step.getFromBucket();
final String toBucket = this.step.getToBucket();
final String fromPath = this.step.getFromPath();
final String toPath = this.step.getToPath();
final String kmsId = this.step.getKmsId();
final Map<String, String> metadatas = new HashMap<>();
final CannedAccessControlList acl = this.step.getAcl();
final String cacheControl = this.step.getCacheControl();
final String contentType = this.step.getContentType();
final String sseAlgorithm = this.step.getSseAlgorithm();
final S3ClientOptions s3ClientOptions = this.step.createS3ClientOptions();
final EnvVars envVars = this.getContext().get(EnvVars.class);
if (this.step.getMetadatas() != null && this.step.getMetadatas().length != 0) {
for (String metadata : this.step.getMetadatas()) {
if (metadata.split(":").length == 2) {
metadatas.put(metadata.split(":")[0], metadata.split(":")[1]);
}
}
}
Preconditions.checkArgument(fromBucket != null && !fromBucket.isEmpty(), "From bucket must not be null or empty");
Preconditions.checkArgument(fromPath != null && !fromPath.isEmpty(), "From path must not be null or empty");
Preconditions.checkArgument(toBucket != null && !toBucket.isEmpty(), "To bucket must not be null or empty");
Preconditions.checkArgument(toPath != null && !toPath.isEmpty(), "To path must not be null or empty");
TaskListener listener = Execution.this.getContext().get(TaskListener.class);
listener.getLogger().format("Copying s3://%s/%s to s3://%s/%s%n", fromBucket, fromPath, toBucket, toPath);
CopyObjectRequest request = new CopyObjectRequest(fromBucket, fromPath, toBucket, toPath);
// Add metadata
if (metadatas.size() > 0 || (cacheControl != null && !cacheControl.isEmpty()) || (contentType != null && !contentType.isEmpty()) || (sseAlgorithm != null && !sseAlgorithm.isEmpty())) {
ObjectMetadata metas = new ObjectMetadata();
if (metadatas.size() > 0) {
metas.setUserMetadata(metadatas);
}
if (cacheControl != null && !cacheControl.isEmpty()) {
metas.setCacheControl(cacheControl);
}
if (contentType != null && !contentType.isEmpty()) {
metas.setContentType(contentType);
}
if (sseAlgorithm != null && !sseAlgorithm.isEmpty()) {
metas.setSSEAlgorithm(sseAlgorithm);
}
request.withNewObjectMetadata(metas);
}
// Add acl
if (acl != null) {
request.withCannedAccessControlList(acl);
}
// Add kms
if (kmsId != null && !kmsId.isEmpty()) {
listener.getLogger().format("Using KMS: %s%n", kmsId);
request.withSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(kmsId));
}
TransferManager mgr = TransferManagerBuilder.standard()
.withS3Client(AWSClientFactory.create(s3ClientOptions.createAmazonS3ClientBuilder(), envVars))
.build();
try {
final Copy copy = mgr.copy(request);
copy.addProgressListener((ProgressListener) progressEvent -> {
if (progressEvent.getEventType() == ProgressEventType.TRANSFER_COMPLETED_EVENT) {
listener.getLogger().println("Finished: " + copy.getDescription());
}
});
copy.waitForCompletion();
}
finally{
mgr.shutdownNow();
}
listener.getLogger().println("Copy complete");
return String.format("s3://%s/%s", toBucket, toPath);
}