下面列出了org.apache.http.concurrent.BasicFuture#com.amazonaws.services.s3.transfer.TransferManager 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static void downloadByFullPathAndFileNamePrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
final String localFolder = params.getRequired("localFolder");
final String s3filePrefix = params.get("s3filePrefix", "");
TransferManager tx = TransferManagerBuilder.defaultTransferManager();
Predicate<String> keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix);
KeyFilter keyFilter = s3filePrefix.isEmpty() ? KeyFilter.INCLUDE_ALL :
objectSummary -> keyPredicate.test(objectSummary.getKey());
try {
tx.downloadDirectory(bucket, s3prefix, new File(localFolder), keyFilter).waitForCompletion();
} catch (InterruptedException e) {
System.out.println("Transfer interrupted");
} finally {
tx.shutdownNow();
}
}
private static void downloadByFullPathAndFileNamePrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
final String localFolder = params.getRequired("localFolder");
final String s3filePrefix = params.get("s3filePrefix", "");
TransferManager tx = TransferManagerBuilder.defaultTransferManager();
Predicate<String> keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix);
KeyFilter keyFilter = s3filePrefix.isEmpty() ? KeyFilter.INCLUDE_ALL :
objectSummary -> keyPredicate.test(objectSummary.getKey());
try {
tx.downloadDirectory(bucket, s3prefix, new File(localFolder), keyFilter).waitForCompletion();
} catch (InterruptedException e) {
System.out.println("Transfer interrupted");
} finally {
tx.shutdownNow();
}
}
public static void uploadDirWithSubprogress(
String dir_path, String bucket_name, String key_prefix, boolean recursive, boolean pause) {
System.out.println(
"directory: " + dir_path + (recursive ? " (recursive)" : "") + (pause ? " (pause)" : ""));
TransferManager xfer_mgr = new TransferManager();
try {
MultipleFileUpload multi_upload =
xfer_mgr.uploadDirectory(bucket_name, key_prefix, new File(dir_path), recursive);
// loop with Transfer.isDone()
XferMgrProgress.showMultiUploadProgress(multi_upload);
// or block with Transfer.waitForCompletion()
XferMgrProgress.waitForCompletion(multi_upload);
} catch (AmazonServiceException e) {
System.err.println(e.getErrorMessage());
System.exit(1);
}
xfer_mgr.shutdownNow();
}
@Test
public void copyCheckTransferManagerIsShutdown() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
s3s3Copier.copy();
verify(mockedTransferManager).shutdownNow();
}
@Test
public void copyCheckTransferManagerIsShutdownWhenSubmittingJobExceptionsAreThrown() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenThrow(new AmazonServiceException("MyCause"));
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
try {
s3s3Copier.copy();
fail("exception should have been thrown");
} catch (CircusTrainException e) {
verify(mockedTransferManager).shutdownNow();
assertThat(e.getCause().getMessage(), startsWith("MyCause"));
}
}
@Test
public void copyDefaultCopierOptions() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
s3s3Copier.copy();
ArgumentCaptor<CopyObjectRequest> argument = ArgumentCaptor.forClass(CopyObjectRequest.class);
verify(mockedTransferManager).copy(argument.capture(), any(AmazonS3.class), any(TransferStateChangeListener.class));
CopyObjectRequest copyObjectRequest = argument.getValue();
assertNull(copyObjectRequest.getNewObjectMetadata());
}
@Override
@Guarded(by = STARTED)
public Blob create(final InputStream blobData, final Map<String, String> headers) {
checkNotNull(blobData);
return create(headers, destination -> {
try (InputStream data = blobData) {
MetricsInputStream input = new MetricsInputStream(data);
TransferManager transferManager = new TransferManager(s3);
transferManager.upload(getConfiguredBucket(), destination, input, new ObjectMetadata())
.waitForCompletion();
return input.getMetrics();
} catch (InterruptedException e) {
throw new BlobStoreException("error uploading blob", e, null);
}
});
}
public void startDownload(TransferManager manager, File base, String pathPrefix, S3ObjectSummary summary) throws AmazonServiceException, IOException {
// calculate target file name
File targetFile = FileUtils.getFile(base, summary.getKey().substring(pathPrefix.length() + 1));
// if target file exists, only download it if newer
if (targetFile.lastModified() < summary.getLastModified().getTime()) {
// ensure directory above file exists
FileUtils.forceMkdir(targetFile.getParentFile());
// Start the download
Download download = manager.download(summary.getBucketName(), summary.getKey(), targetFile);
// Keep for later
startedDownloads.add(new Memo(download, targetFile, summary.getLastModified().getTime()));
}
}
private void downloadFile(TransferManager tx,
String bucketName,
String sourcePrefixKey,
String destinationFile) throws Exception{
try {
final File snapshotFile = new File(destinationFile);
// Only create parent directory once, if it doesn't exist.
final File parentDir = new File(snapshotFile.getParent());
if (!parentDir.isDirectory()) {
final boolean parentDirCreated = parentDir.mkdirs();
if (!parentDirCreated) {
LOGGER.error(
"Error creating parent directory for file: {}. Skipping to next",
destinationFile);
return;
}
}
snapshotFile.createNewFile();
final Download download = tx.download(bucketName, sourcePrefixKey, snapshotFile);
download.waitForCompletion();
} catch (Exception e) {
LOGGER.error("Error downloading the file {} : {}", destinationFile, e);
throw new Exception(e);
}
}
public static void downloadDir(String bucket_name, String key_prefix,
String dir_path, boolean pause) {
System.out.println("downloading to directory: " + dir_path +
(pause ? " (pause)" : ""));
// snippet-start:[s3.java1.s3_xfer_mgr_download.directory]
TransferManager xfer_mgr = TransferManagerBuilder.standard().build();
try {
MultipleFileDownload xfer = xfer_mgr.downloadDirectory(
bucket_name, key_prefix, new File(dir_path));
// loop with Transfer.isDone()
XferMgrProgress.showTransferProgress(xfer);
// or block with Transfer.waitForCompletion()
XferMgrProgress.waitForCompletion(xfer);
} catch (AmazonServiceException e) {
System.err.println(e.getErrorMessage());
System.exit(1);
}
xfer_mgr.shutdownNow();
// snippet-end:[s3.java1.s3_xfer_mgr_download.directory]
}
public static void downloadFile(String bucket_name, String key_name,
String file_path, boolean pause) {
System.out.println("Downloading to file: " + file_path +
(pause ? " (pause)" : ""));
// snippet-start:[s3.java1.s3_xfer_mgr_download.single]
File f = new File(file_path);
TransferManager xfer_mgr = TransferManagerBuilder.standard().build();
try {
Download xfer = xfer_mgr.download(bucket_name, key_name, f);
// loop with Transfer.isDone()
XferMgrProgress.showTransferProgress(xfer);
// or block with Transfer.waitForCompletion()
XferMgrProgress.waitForCompletion(xfer);
} catch (AmazonServiceException e) {
System.err.println(e.getErrorMessage());
System.exit(1);
}
xfer_mgr.shutdownNow();
// snippet-end:[s3.java1.s3_xfer_mgr_download.single]
}
public static void copyObjectSimple(String from_bucket, String from_key,
String to_bucket, String to_key) {
// snippet-start:[s3.java1.s3_xfer_mgr_copy.copy_object]
System.out.println("Copying s3 object: " + from_key);
System.out.println(" from bucket: " + from_bucket);
System.out.println(" to s3 object: " + to_key);
System.out.println(" in bucket: " + to_bucket);
TransferManager xfer_mgr = TransferManagerBuilder.standard().build();
try {
Copy xfer = xfer_mgr.copy(from_bucket, from_key, to_bucket, to_key);
// loop with Transfer.isDone()
XferMgrProgress.showTransferProgress(xfer);
// or block with Transfer.waitForCompletion()
XferMgrProgress.waitForCompletion(xfer);
} catch (AmazonServiceException e) {
System.err.println(e.getErrorMessage());
System.exit(1);
}
xfer_mgr.shutdownNow();
// snippet-end:[s3.java1.s3_xfer_mgr_copy.copy_object]
}
public static void uploadDir(String dir_path, String bucket_name,
String key_prefix, boolean recursive, boolean pause) {
System.out.println("directory: " + dir_path + (recursive ?
" (recursive)" : "") + (pause ? " (pause)" : ""));
// snippet-start:[s3.java1.s3_xfer_mgr_upload.directory]
TransferManager xfer_mgr = TransferManagerBuilder.standard().build();
try {
MultipleFileUpload xfer = xfer_mgr.uploadDirectory(bucket_name,
key_prefix, new File(dir_path), recursive);
// loop with Transfer.isDone()
XferMgrProgress.showTransferProgress(xfer);
// or block with Transfer.waitForCompletion()
XferMgrProgress.waitForCompletion(xfer);
} catch (AmazonServiceException e) {
System.err.println(e.getErrorMessage());
System.exit(1);
}
xfer_mgr.shutdownNow();
// snippet-end:[s3.java1.s3_xfer_mgr_upload.directory]
}
@Override
public S3FileTransferResultsDto downloadFile(final S3FileTransferRequestParamsDto params) throws InterruptedException
{
LOGGER.info("Downloading S3 file... s3Key=\"{}\" s3BucketName=\"{}\" localPath=\"{}\"", params.getS3KeyPrefix(), params.getS3BucketName(),
params.getLocalPath());
// Perform the transfer.
S3FileTransferResultsDto results = performTransfer(params, new Transferer()
{
@Override
public Transfer performTransfer(TransferManager transferManager)
{
return s3Operations.download(params.getS3BucketName(), params.getS3KeyPrefix(), new File(params.getLocalPath()), transferManager);
}
});
LOGGER
.info("Downloaded S3 file to the local system. s3Key=\"{}\" s3BucketName=\"{}\" localPath=\"{}\" totalBytesTransferred={} transferDuration=\"{}\"",
params.getS3KeyPrefix(), params.getS3BucketName(), params.getLocalPath(), results.getTotalBytesTransferred(),
HerdDateUtils.formatDuration(results.getDurationMillis()));
logOverallTransferRate(results);
return results;
}
/**
* Gets a transfer manager with the specified parameters including proxy host, proxy port, S3 access key, S3 secret key, and max threads.
*
* @param params the parameters.
*
* @return a newly created transfer manager.
*/
private TransferManager getTransferManager(final S3FileTransferRequestParamsDto params)
{
// We are returning a new transfer manager each time it is called. Although the Javadocs of TransferManager say to share a single instance
// if possible, this could potentially be a problem if TransferManager.shutdown(true) is called and underlying resources are not present when needed
// for subsequent transfers.
if (params.getMaxThreads() == null)
{
// Create a transfer manager that will internally use an appropriate number of threads.
return new TransferManager(getAmazonS3(params));
}
else
{
// Create a transfer manager with our own executor configured with the specified total threads.
LOGGER.info("Creating a transfer manager. fixedThreadPoolSize={}", params.getMaxThreads());
return new TransferManager(getAmazonS3(params), Executors.newFixedThreadPool(params.getMaxThreads()));
}
}
private static TransferManager createManager() {
AWSCredentials credentials = new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return System.getProperty("s3.accessKey");
}
@Override
public String getAWSSecretKey() {
return System.getProperty("s3.secretKey");
}
};
return new TransferManager(
credentials);
}
/**
* Method to download file from s3 to local file system
*
* @param bucketName AWS S3 bucket name
* @param key (example: android/apkFolder/ApkName.apk)
* @param file (local file name)
* @param pollingInterval (polling interval in sec for S3 download status determination)
*/
public void download(final String bucketName, final String key, final File file, long pollingInterval) {
LOGGER.info("App will be downloaded from s3.");
LOGGER.info(String.format("[Bucket name: %s] [Key: %s] [File: %s]", bucketName, key, file.getAbsolutePath()));
DefaultAWSCredentialsProviderChain credentialProviderChain = new DefaultAWSCredentialsProviderChain();
TransferManager tx = new TransferManager(
credentialProviderChain.getCredentials());
Download appDownload = tx.download(bucketName, key, file);
try {
LOGGER.info("Transfer: " + appDownload.getDescription());
LOGGER.info(" State: " + appDownload.getState());
LOGGER.info(" Progress: ");
// You can poll your transfer's status to check its progress
while (!appDownload.isDone()) {
LOGGER.info(" transferred: " + (int) (appDownload.getProgress().getPercentTransferred() + 0.5) + "%");
CommonUtils.pause(pollingInterval);
}
LOGGER.info(" State: " + appDownload.getState());
// appDownload.waitForCompletion();
} catch (AmazonClientException e) {
throw new RuntimeException("File wasn't downloaded from s3. See log: ".concat(e.getMessage()));
}
// tx.shutdownNow();
}
@Test
public void runAppAndBasicTest() {
AmazonS3Client s3client = server.getSpringContext()
.getBean(AmazonS3Client.class);
assertThat(s3client != null, is(true));
S3Configuration s3Configuration = server.getSpringContext()
.getBean(S3Configuration.class);
assertThat(s3Configuration.getAccessKey(), is(""));
assertThat(s3Configuration.getSecretKey(), is(""));
assertThat(s3Configuration.getSessionToken() == null, is(true));
assertThat(s3Configuration.getRegion() == null, is(true));
assertThat(s3Configuration.getUploadThreads(), is(5));
assertThat(s3Configuration.getUploadThreadNamePrefix(), is("s3-transfer-manager-worker-"));
S3Utils s3Utils = server.getSpringContext()
.getBean(S3Utils.class);
assertThat(s3Utils != null, is(true));
TransferManager tm = server.getSpringContext()
.getBean(TransferManager.class);
assertThat(tm != null, is(true));
}
@Test
public void getInputStreamSupplier()
throws AmazonClientException, InterruptedException, IOException {
TransferManager transferManager = mock(TransferManager.class);
Download download = mock(Download.class);
when(transferManager.download(anyString(), anyString(), any())).thenReturn(download);
File file = Files.createTempFile("micro-s3", "test")
.toFile();
assertTrue(file.exists());
ReadUtils utils = new ReadUtils(transferManager, "test");
InputStream stream = utils.getInputStream("", "", () -> file);
assertNotNull(stream);
assertFalse(file.exists());
}
@Test
@SneakyThrows
public void getFileInputStream() {
TransferManager transferManager = mock(TransferManager.class);
Download download = mock(Download.class);
when(transferManager.download(anyString(), anyString(), any())).thenReturn(download);
ReadUtils readUtils = new ReadUtils(transferManager,System.getProperty("java.io.tmpdir"));
InputStream fileInputStream = readUtils.getFileInputStream("bucket", "key");
assertNotNull(fileInputStream);
verify(transferManager, times(1)).download(anyString(), anyString(), any(File.class));
verify(download, times(1)).waitForCompletion();
fileInputStream.close();
}
@VisibleForTesting
S3SchemaUriResolver(
TransferManager transferManager,
String uriFormat,
String bucket,
String keyPrefix,
boolean enableServerSideEncryption) {
this.transferManager = transferManager;
this.uriFormat = uriFormat;
this.bucket = bucket;
this.keyPrefix = keyPrefix;
this.enableServerSideEncryption = enableServerSideEncryption;
}
private static void downloadFile(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3file = params.getRequired("s3file");
final String localFile = params.getRequired("localFile");
TransferManager tx = TransferManagerBuilder.defaultTransferManager();
try {
tx.download(bucket, s3file, new File(localFile)).waitForCompletion();
} catch (InterruptedException e) {
System.out.println("Transfer interrupted");
} finally {
tx.shutdownNow();
}
}
DefaultFileHelper(
Target.Context context,
S3TargetConfigBean s3TargetConfigBean,
TransferManager transferManager,
boolean isErrorStage
) {
super(context, s3TargetConfigBean, transferManager);
this.isErrorStage = isErrorStage;
}
@Test
public void shouldCreateDefaultTransferManagerClient() {
S3S3CopierOptions s3Options = new S3S3CopierOptions(new HashMap<String, Object>() {{
put(S3S3CopierOptions.Keys.MULTIPART_COPY_THRESHOLD.keyName(), MULTIPART_COPY_THRESHOLD_VALUE);
put(S3S3CopierOptions.Keys.MULTIPART_COPY_PART_SIZE.keyName(), MULTIPART_COPY_PART_SIZE);
}});
TransferManagerFactory factory = new TransferManagerFactory();
TransferManager transferManager = factory.newInstance(mockClient, s3Options);
assertThat(transferManager.getAmazonS3Client(), is(mockClient));
TransferManagerConfiguration managerConfig = transferManager.getConfiguration();
assertThat(managerConfig.getMultipartCopyPartSize(), is(MULTIPART_COPY_PART_SIZE));
assertThat(managerConfig.getMultipartCopyThreshold(), is(MULTIPART_COPY_THRESHOLD_VALUE));
}
@Test
public void copyCheckTransferManagerIsShutdownWhenMaxRetriesExceeded() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(copy.getProgress()).thenReturn(new TransferProgress());
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
doThrow(new AmazonClientException("cause")).when(copy).waitForCompletion();
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
try {
s3s3Copier.copy();
fail("exception should have been thrown");
} catch (CircusTrainException e) {
verify(mockedTransferManager).shutdownNow();
verify(mockedTransferManager, Mockito.times(3))
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class));
assertThat(e.getMessage(), is("1 job(s) failed the maximum number of copy attempts, 3"));
}
}
@Test
public void copyServerSideEncryption() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
Map<String, Object> copierOptions = new HashMap<>();
copierOptions.put(S3S3CopierOptions.Keys.S3_SERVER_SIDE_ENCRYPTION.keyName(), "true");
S3S3CopierOptions customOptions = new S3S3CopierOptions(copierOptions);
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(customOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, customOptions);
s3s3Copier.copy();
ArgumentCaptor<CopyObjectRequest> argument = ArgumentCaptor.forClass(CopyObjectRequest.class);
verify(mockedTransferManager).copy(argument.capture(), any(AmazonS3.class), any(TransferStateChangeListener.class));
CopyObjectRequest copyObjectRequest = argument.getValue();
assertThat(copyObjectRequest.getNewObjectMetadata().getSSEAlgorithm(),
is(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION));
}
@Test
public void copyCannedAcl() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
Map<String, Object> copierOptions = new HashMap<>();
copierOptions
.put(S3S3CopierOptions.Keys.CANNED_ACL.keyName(), CannedAccessControlList.BucketOwnerFullControl.toString());
S3S3CopierOptions customOptions = new S3S3CopierOptions(copierOptions);
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(customOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenReturn(copy);
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, customOptions);
s3s3Copier.copy();
ArgumentCaptor<CopyObjectRequest> argument = ArgumentCaptor.forClass(CopyObjectRequest.class);
verify(mockedTransferManager).copy(argument.capture(), any(AmazonS3.class), any(TransferStateChangeListener.class));
CopyObjectRequest copyObjectRequest = argument.getValue();
assertThat(copyObjectRequest.getCannedAccessControlList(), is(CannedAccessControlList.BucketOwnerFullControl));
}
@Test
public void copySafelyShutDownTransferWhenRetryFails() throws Exception {
client.putObject("source", "data", inputData);
Path sourceBaseLocation = new Path("s3://source/");
Path replicaLocation = new Path("s3://target/");
List<Path> sourceSubLocations = new ArrayList<>();
TransferManagerFactory mockedTransferManagerFactory = Mockito.mock(TransferManagerFactory.class);
TransferManager mockedTransferManager = Mockito.mock(TransferManager.class);
when(mockedTransferManagerFactory.newInstance(any(AmazonS3.class), eq(s3S3CopierOptions)))
.thenReturn(mockedTransferManager);
Copy copy = Mockito.mock(Copy.class);
when(mockedTransferManager
.copy(any(CopyObjectRequest.class), any(AmazonS3.class), any(TransferStateChangeListener.class)))
.thenThrow(new AmazonClientException("S3 error"));
TransferProgress transferProgress = new TransferProgress();
when(copy.getProgress()).thenReturn(transferProgress);
S3S3Copier s3s3Copier = new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, s3ClientFactory,
mockedTransferManagerFactory, listObjectsRequestFactory, registry, s3S3CopierOptions);
try {
s3s3Copier.copy();
fail("Exception should have been thrown");
} catch (CircusTrainException e) {
verify(mockedTransferManager).shutdownNow();
assertThat(e.getMessage(), is("Error in S3S3Copier:"));
assertThat(e.getCause().getMessage(), startsWith("S3 error"));
}
}
TransferManager createTransferManager(AmazonS3 s3Client) throws StageException {
return createTransferManagerBuilder().withS3Client(s3Client)
.withExecutorFactory(
createExecutorFactory(transferManagerConfigs.getThreads())
)
.withShutDownThreadPools(true)
.withMinimumUploadPartSize(transferManagerConfigs.getMinimumUploadPartSize())
.withMultipartUploadThreshold(transferManagerConfigs
.getMultipartUploadThreshold())
.build();
}
/**
* Upload from slave
*/
@Override
public Integer invoke(final TransferManager transferManager, File base, VirtualChannel channel) throws IOException, InterruptedException {
if(!base.exists()) return 0;
final AtomicInteger count = new AtomicInteger(0);
final Uploads uploads = new Uploads();
final Map<String, S3ObjectSummary> summaries = lookupExistingCacheEntries(transferManager.getAmazonS3Client());
// Find files to upload that match scan
scanner.scan(base, new FileVisitor() {
@Override
public void visit(File f, String relativePath) throws IOException {
if (f.isFile()) {
String key = pathPrefix + "/" + relativePath;
S3ObjectSummary summary = summaries.get(key);
if (summary == null || f.lastModified() > summary.getLastModified().getTime()) {
final ObjectMetadata metadata = buildMetadata(f);
uploads.startUploading(transferManager, f, IOUtils.toBufferedInputStream(FileUtils.openInputStream(f)), new Destination(bucketName, key), metadata);
if (uploads.count() > 20) {
waitForUploads(count, uploads);
}
}
}
}
});
// Wait for each file to complete before returning
waitForUploads(count, uploads);
return uploads.count();
}