下面列出了怎么用com.amazonaws.services.s3.model.MultipartUploadListing的API类实例代码及写法,或者点击链接到github查看源代码。
private void prepareTest(String filename) {
runner.setProperty(PutS3Object.REGION, "ap-northeast-1");
runner.setProperty(PutS3Object.BUCKET, "test-bucket");
runner.assertValid();
Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("filename", filename);
ffAttributes.put("tagS3PII", "true");
runner.enqueue("Test Content", ffAttributes);
PutObjectResult putObjectResult = new PutObjectResult();
putObjectResult.setExpirationTime(new Date());
putObjectResult.setMetadata(new ObjectMetadata());
putObjectResult.setVersionId("test-version");
putObjectResult.setETag("test-etag");
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
MultipartUploadListing uploadListing = new MultipartUploadListing();
Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
}
private void cleanupMultipartUploads() {
final AmazonS3 s3Client = transferManager.getAmazonS3Client();
final Instant yesterdayInstant = ZonedDateTime.now().minusDays(1).toInstant();
logger.info("Cleaning up multipart uploads older than {}.", yesterdayInstant);
final ListMultipartUploadsRequest listMultipartUploadsRequest = new ListMultipartUploadsRequest(request.storageLocation.bucket)
.withPrefix(request.storageLocation.clusterId + "/" + request.storageLocation.datacenterId);
while (true) {
final MultipartUploadListing multipartUploadListing = s3Client.listMultipartUploads(listMultipartUploadsRequest);
multipartUploadListing.getMultipartUploads().stream()
.filter(u -> u.getInitiated().toInstant().isBefore(yesterdayInstant))
.forEach(u -> {
logger.info("Aborting multi-part upload for key \"{}\" initiated on {}", u.getKey(), u.getInitiated().toInstant());
try {
s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(request.storageLocation.bucket, u.getKey(), u.getUploadId()));
} catch (final AmazonClientException e) {
logger.error("Failed to abort multipart upload for key \"{}\".", u.getKey(), e);
}
});
if (!multipartUploadListing.isTruncated()) {
break;
}
listMultipartUploadsRequest
.withKeyMarker(multipartUploadListing.getKeyMarker())
.withUploadIdMarker(multipartUploadListing.getUploadIdMarker());
}
}
protected boolean localUploadExistsInS3(final AmazonS3Client s3, final String bucket, final MultipartState localState) {
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
for (MultipartUpload upload : listing.getMultipartUploads()) {
if (upload.getUploadId().equals(localState.getUploadId())) {
return true;
}
}
return false;
}
@Test
public void testStateRemove() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr";
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key);
upload1.setUploadId("1234");
uploadList.add(upload1);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* store state, retrieve and validate, remove and validate
*/
PutS3Object.MultipartState stateOrig = new PutS3Object.MultipartState();
stateOrig.setUploadId("1234");
stateOrig.setContentLength(1234L);
processor.persistLocalState(cacheKey, stateOrig);
PutS3Object.MultipartState state1 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
Assert.assertEquals("1234", state1.getUploadId());
Assert.assertEquals(1234L, state1.getContentLength().longValue());
processor.persistLocalState(cacheKey, null);
PutS3Object.MultipartState state2 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
Assert.assertNull(state2);
}
/**
* {@inheritDoc} <p/> <p> Since a multipart upload in progress does not exist when in-memory, this method simply returns a preconfigured list. </p> <p>
* Returns a mock {@link MultipartUploadListing} based on the parameters and hints provided. By default returns a mock listing as defiend by {@link
* #getMultipartUploadListing()}. </p> <p> This operation takes the following hints when suffixed in listMultipartUploadsRequest.bucketName: <dl> <p/>
* <dt>MOCK_S3_BUCKET_NAME_SERVICE_EXCEPTION</dt> <dd>Throws a AmazonServiceException</dd> <p/> <dt>MOCK_S3_BUCKET_NAME_TRUNCATED_MULTIPART_LISTING</dt>
* <dd>Returns the listing as if it is truncated. See below for details.</dd> <p/> </dl> </p>
*/
@Override
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest, AmazonS3 s3Client)
{
if (listMultipartUploadsRequest.getBucketName().equals(MOCK_S3_BUCKET_NAME_SERVICE_EXCEPTION))
{
throw new AmazonServiceException(null);
}
else if (listMultipartUploadsRequest.getBucketName().equals(MOCK_S3_BUCKET_NAME_TRUNCATED_MULTIPART_LISTING))
{
MultipartUploadListing multipartUploadListing = getMultipartUploadListing();
// If listing request does not have upload ID marker set, mark the listing as truncated - this is done to truncate the multipart listing just once.
if (listMultipartUploadsRequest.getUploadIdMarker() == null)
{
multipartUploadListing.setNextUploadIdMarker("TEST_UPLOAD_MARKER_ID");
multipartUploadListing.setNextKeyMarker("TEST_KEY_MARKER_ID");
multipartUploadListing.setTruncated(true);
}
return multipartUploadListing;
}
else
{
return getMultipartUploadListing();
}
}
/**
* <p> Returns a mock {@link MultipartUploadListing}. </p> <p> The return object has the following properties. <dl> <dt>multipartUploads</dt> <dd>Length 3
* list</dd> <p/> <dt>multipartUploads[0].initiated</dt> <dd>5 minutes prior to the object creation time.</dd> <p/> <dt>multipartUploads[1].initiated</dt>
* <dd>15 minutes prior to the object creation time.</dd> <p/> <dt>multipartUploads[2].initiated</dt> <dd>20 minutes prior to the object creation time.</dd>
* </dl> <p/> All other properties as set to default as defined in the by {@link MultipartUploadListing} constructor. </p>
*
* @return a mock object
*/
private MultipartUploadListing getMultipartUploadListing()
{
// Return 3 multipart uploads with 2 of them started more than 10 minutes ago.
MultipartUploadListing multipartUploadListing = new MultipartUploadListing();
List<MultipartUpload> multipartUploads = new ArrayList<>();
multipartUploadListing.setMultipartUploads(multipartUploads);
Date now = new Date();
multipartUploads.add(getMultipartUpload(HerdDateUtils.addMinutes(now, -5)));
multipartUploads.add(getMultipartUpload(HerdDateUtils.addMinutes(now, -15)));
multipartUploads.add(getMultipartUpload(HerdDateUtils.addMinutes(now, -20)));
return multipartUploadListing;
}
protected boolean localUploadExistsInS3(final AmazonS3Client s3, final String bucket, final MultipartState localState) {
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
for (MultipartUpload upload : listing.getMultipartUploads()) {
if (upload.getUploadId().equals(localState.getUploadId())) {
return true;
}
}
return false;
}
@Test
public void testStateRemove() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr";
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key);
upload1.setUploadId("1234");
uploadList.add(upload1);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* store state, retrieve and validate, remove and validate
*/
PutS3Object.MultipartState stateOrig = new PutS3Object.MultipartState();
stateOrig.setUploadId("1234");
stateOrig.setContentLength(1234L);
processor.persistLocalState(cacheKey, stateOrig);
PutS3Object.MultipartState state1 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
Assert.assertEquals("1234", state1.getUploadId());
Assert.assertEquals(1234L, state1.getContentLength().longValue());
processor.persistLocalState(cacheKey, null);
PutS3Object.MultipartState state2 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
Assert.assertNull(state2);
}
protected void ageoffS3Uploads(final ProcessContext context, final AmazonS3Client s3, final long now) {
MultipartUploadListing oldUploads = getS3AgeoffListAndAgeoffLocalState(context, s3, now);
for (MultipartUpload upload : oldUploads.getMultipartUploads()) {
abortS3MultipartUpload(s3, oldUploads.getBucketName(), upload);
}
}
protected MultipartUploadListing getS3AgeoffListAndAgeoffLocalState(final ProcessContext context, final AmazonS3Client s3, final long now) {
final long ageoff_interval = context.getProperty(MULTIPART_S3_AGEOFF_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final Long maxAge = context.getProperty(MULTIPART_S3_MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long ageCutoff = now - maxAge;
final List<MultipartUpload> ageoffList = new ArrayList<>();
if ((lastS3AgeOff.get() < now - ageoff_interval) && s3BucketLock.tryLock()) {
try {
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
for (MultipartUpload upload : listing.getMultipartUploads()) {
long uploadTime = upload.getInitiated().getTime();
if (uploadTime < ageCutoff) {
ageoffList.add(upload);
}
}
// ageoff any local state
ageoffLocalState(ageCutoff);
lastS3AgeOff.set(System.currentTimeMillis());
} catch(AmazonClientException e) {
if (e instanceof AmazonS3Exception
&& ((AmazonS3Exception)e).getStatusCode() == 403
&& ((AmazonS3Exception) e).getErrorCode().equals("AccessDenied")) {
getLogger().warn("AccessDenied checking S3 Multipart Upload list for {}: {} " +
"** The configured user does not have the s3:ListBucketMultipartUploads permission " +
"for this bucket, S3 ageoff cannot occur without this permission. Next ageoff check " +
"time is being advanced by interval to prevent checking on every upload **",
new Object[]{bucket, e.getMessage()});
lastS3AgeOff.set(System.currentTimeMillis());
} else {
getLogger().error("Error checking S3 Multipart Upload list for {}: {}",
new Object[]{bucket, e.getMessage()});
}
} finally {
s3BucketLock.unlock();
}
}
MultipartUploadListing result = new MultipartUploadListing();
result.setBucketName(bucket);
result.setMultipartUploads(ageoffList);
return result;
}
@Test
public void testLocalStatePersistence() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key;
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2";
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3";
/*
* store 3 versions of state
*/
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
processor.persistLocalState(cacheKey1, state1orig);
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
state2orig.setUploadId("1234");
state2orig.setContentLength(1234L);
processor.persistLocalState(cacheKey2, state2orig);
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
state3orig.setUploadId("5678");
state3orig.setContentLength(5678L);
processor.persistLocalState(cacheKey3, state3orig);
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key);
upload1.setUploadId("");
uploadList.add(upload1);
final MultipartUpload upload2 = new MultipartUpload();
upload2.setKey(key + "-v2");
upload2.setUploadId("1234");
uploadList.add(upload2);
final MultipartUpload upload3 = new MultipartUpload();
upload3.setKey(key + "-v3");
upload3.setUploadId("5678");
uploadList.add(upload3);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* reload and validate stored state
*/
final PutS3Object.MultipartState state1new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey1);
Assert.assertEquals("", state1new.getUploadId());
Assert.assertEquals(0L, state1new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state1new.getPartETags());
Assert.assertEquals(0L, state1new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state1new.getStorageClass());
Assert.assertEquals(0L, state1new.getContentLength().longValue());
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
Assert.assertEquals("1234", state2new.getUploadId());
Assert.assertEquals(0L, state2new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state2new.getPartETags());
Assert.assertEquals(0L, state2new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state2new.getStorageClass());
Assert.assertEquals(1234L, state2new.getContentLength().longValue());
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
Assert.assertEquals("5678", state3new.getUploadId());
Assert.assertEquals(0L, state3new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state3new.getPartETags());
Assert.assertEquals(0L, state3new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state3new.getStorageClass());
Assert.assertEquals(5678L, state3new.getContentLength().longValue());
}
@Test
public void testStatePersistsETags() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1";
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2";
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3";
/*
* store 3 versions of state
*/
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
processor.persistLocalState(cacheKey1, state1orig);
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
state2orig.setUploadId("1234");
state2orig.setContentLength(1234L);
processor.persistLocalState(cacheKey2, state2orig);
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
state3orig.setUploadId("5678");
state3orig.setContentLength(5678L);
processor.persistLocalState(cacheKey3, state3orig);
/*
* persist state to caches so that
* 1. v2 has 2 and then 4 tags
* 2. v3 has 4 and then 2 tags
*/
state2orig.getPartETags().add(new PartETag(1, "state 2 tag one"));
state2orig.getPartETags().add(new PartETag(2, "state 2 tag two"));
processor.persistLocalState(cacheKey2, state2orig);
state2orig.getPartETags().add(new PartETag(3, "state 2 tag three"));
state2orig.getPartETags().add(new PartETag(4, "state 2 tag four"));
processor.persistLocalState(cacheKey2, state2orig);
state3orig.getPartETags().add(new PartETag(1, "state 3 tag one"));
state3orig.getPartETags().add(new PartETag(2, "state 3 tag two"));
state3orig.getPartETags().add(new PartETag(3, "state 3 tag three"));
state3orig.getPartETags().add(new PartETag(4, "state 3 tag four"));
processor.persistLocalState(cacheKey3, state3orig);
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
processor.persistLocalState(cacheKey3, state3orig);
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key + "-bv2");
upload1.setUploadId("1234");
uploadList.add(upload1);
final MultipartUpload upload2 = new MultipartUpload();
upload2.setKey(key + "-bv3");
upload2.setUploadId("5678");
uploadList.add(upload2);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* load state and validate that
* 1. v2 restore shows 4 tags
* 2. v3 restore shows 2 tags
*/
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
Assert.assertEquals("1234", state2new.getUploadId());
Assert.assertEquals(4, state2new.getPartETags().size());
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
Assert.assertEquals("5678", state3new.getUploadId());
Assert.assertEquals(2, state3new.getPartETags().size());
}
@Ignore
@Test
public void testS3MultipartAgeoff() throws InterruptedException, IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final ProcessContext context = runner.getProcessContext();
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
// set check interval and age off to minimum values
runner.setProperty(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL, "1 milli");
runner.setProperty(PutS3Object.MULTIPART_S3_MAX_AGE, "1 milli");
// create some dummy uploads
for (Integer i = 0; i < 3; i++) {
final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(
BUCKET_NAME, "file" + i.toString() + ".txt");
try {
client.initiateMultipartUpload(initiateRequest);
} catch (AmazonClientException e) {
Assert.fail("Failed to initiate upload: " + e.getMessage());
}
}
// Age off is time dependent, so test has some timing constraints. This
// sleep() delays long enough to satisfy interval and age intervals.
Thread.sleep(2000L);
// System millis are used for timing, but it is incremented on each
// call to circumvent what appears to be caching in the AWS library.
// The increments are 1000 millis because AWS returns upload
// initiation times in whole seconds.
Long now = System.currentTimeMillis();
MultipartUploadListing uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now);
Assert.assertEquals(3, uploadList.getMultipartUploads().size());
MultipartUpload upload0 = uploadList.getMultipartUploads().get(0);
processor.abortS3MultipartUpload(client, BUCKET_NAME, upload0);
uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now+1000);
Assert.assertEquals(2, uploadList.getMultipartUploads().size());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test-upload.txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
runner.run();
uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now+2000);
Assert.assertEquals(0, uploadList.getMultipartUploads().size());
}
public void setListing(MultipartUploadListing newlisting) {
listing = newlisting;
}
@Override
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest)
throws AmazonClientException, AmazonServiceException {
return listing;
}
@Override
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest, AmazonS3 s3Client)
{
return s3Client.listMultipartUploads(listMultipartUploadsRequest);
}
@Override
public int abortMultipartUploads(S3FileTransferRequestParamsDto params, Date thresholdDate)
{
// Create an Amazon S3 client.
AmazonS3Client s3Client = getAmazonS3(params);
int abortedMultipartUploadsCount = 0;
try
{
// List upload markers. Null implies initial list request.
String uploadIdMarker = null;
String keyMarker = null;
boolean truncated;
do
{
// Create the list multipart request, optionally using the last markers.
ListMultipartUploadsRequest request = new ListMultipartUploadsRequest(params.getS3BucketName());
request.setUploadIdMarker(uploadIdMarker);
request.setKeyMarker(keyMarker);
// Request the multipart upload listing.
MultipartUploadListing uploadListing = s3Operations.listMultipartUploads(TransferManager.appendSingleObjectUserAgent(request), s3Client);
for (MultipartUpload upload : uploadListing.getMultipartUploads())
{
if (upload.getInitiated().compareTo(thresholdDate) < 0)
{
// Abort the upload.
s3Operations.abortMultipartUpload(TransferManager
.appendSingleObjectUserAgent(new AbortMultipartUploadRequest(params.getS3BucketName(), upload.getKey(), upload.getUploadId())),
s3Client);
// Log the information about the aborted multipart upload.
LOGGER.info("Aborted S3 multipart upload. s3Key=\"{}\" s3BucketName=\"{}\" s3MultipartUploadInitiatedDate=\"{}\"", upload.getKey(),
params.getS3BucketName(), upload.getInitiated());
// Increment the counter.
abortedMultipartUploadsCount++;
}
}
// Determine whether there are more uploads to list.
truncated = uploadListing.isTruncated();
if (truncated)
{
// Record the list markers.
uploadIdMarker = uploadListing.getNextUploadIdMarker();
keyMarker = uploadListing.getNextKeyMarker();
}
}
while (truncated);
}
finally
{
// Shutdown the Amazon S3 client instance to release resources.
s3Client.shutdown();
}
return abortedMultipartUploadsCount;
}
@Test
public void testAbortMultipartUploadsAssertTruncatedResult()
{
S3Operations originalS3Operations = (S3Operations) ReflectionTestUtils.getField(s3Dao, "s3Operations");
S3Operations mockS3Operations = mock(S3Operations.class);
ReflectionTestUtils.setField(s3Dao, "s3Operations", mockS3Operations);
try
{
String s3BucketName = "s3BucketName";
String uploadKey = "uploadKey";
String uploadId = "uploadId";
Date uploadInitiated = new Date(0);
S3FileTransferRequestParamsDto s3FileTransferRequestParamsDto = new S3FileTransferRequestParamsDto();
s3FileTransferRequestParamsDto.setS3BucketName(s3BucketName);
Date thresholdDate = new Date(1);
when(mockS3Operations.listMultipartUploads(any(), any())).then(new Answer<MultipartUploadListing>()
{
@Override
public MultipartUploadListing answer(InvocationOnMock invocation) throws Throwable
{
ListMultipartUploadsRequest listMultipartUploadsRequest = invocation.getArgument(0);
String keyMarker = listMultipartUploadsRequest.getKeyMarker();
String uploadIdMarker = listMultipartUploadsRequest.getUploadIdMarker();
MultipartUploadListing multipartUploadListing = new MultipartUploadListing();
if (keyMarker == null || uploadIdMarker == null)
{
multipartUploadListing.setNextKeyMarker("nextKeyMarker");
multipartUploadListing.setNextUploadIdMarker("nextUploadIdMarker");
multipartUploadListing.setTruncated(true);
}
else
{
assertEquals("nextKeyMarker", keyMarker);
assertEquals("nextUploadIdMarker", uploadIdMarker);
MultipartUpload multipartUpload = new MultipartUpload();
multipartUpload.setUploadId(uploadId);
multipartUpload.setKey(uploadKey);
multipartUpload.setInitiated(uploadInitiated);
multipartUploadListing.getMultipartUploads().add(multipartUpload);
}
return multipartUploadListing;
}
});
assertEquals(1, s3Dao.abortMultipartUploads(s3FileTransferRequestParamsDto, thresholdDate));
// Assert listMultipartUploads() is called twice due to truncation
verify(mockS3Operations, times(2)).listMultipartUploads(any(), any());
/*
* Assert that S3Operations.abortMultipartUpload is called exactly ONCE with arguments matching the given ArgumentMatcher
*/
verify(mockS3Operations).abortMultipartUpload(argThat(
argument -> Objects.equal(s3BucketName, argument.getBucketName()) && Objects.equal(uploadKey, argument.getKey()) &&
Objects.equal(uploadId, argument.getUploadId())), any());
// Assert that no other interactions occur with the mock
verifyNoMoreInteractions(mockS3Operations);
}
finally
{
ReflectionTestUtils.setField(s3Dao, "s3Operations", originalS3Operations);
}
}
/** Unsupported Operation. */
@Override public MultipartUploadListing listMultipartUploads(
ListMultipartUploadsRequest req) throws SdkClientException {
throw new UnsupportedOperationException("Operation not supported");
}
@Test
public void testMultipartUploadAbort() throws Exception {
String blobName = "multipart-upload-abort";
ByteSource byteSource = TestUtils.randomByteSource().slice(
0, MINIMUM_MULTIPART_SIZE);
InitiateMultipartUploadResult result = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(containerName, blobName));
// TODO: google-cloud-storage and openstack-swift cannot list multipart
// uploads
MultipartUploadListing multipartListing = client.listMultipartUploads(
new ListMultipartUploadsRequest(containerName));
if (blobStoreType.equals("azureblob")) {
// Azure does not create a manifest during initiate multi-part
// upload. Instead the first part creates this.
assertThat(multipartListing.getMultipartUploads()).isEmpty();
} else {
assertThat(multipartListing.getMultipartUploads()).hasSize(1);
}
PartListing partListing = client.listParts(new ListPartsRequest(
containerName, blobName, result.getUploadId()));
assertThat(partListing.getParts()).isEmpty();
client.uploadPart(new UploadPartRequest()
.withBucketName(containerName)
.withKey(blobName)
.withUploadId(result.getUploadId())
.withPartNumber(1)
.withPartSize(byteSource.size())
.withInputStream(byteSource.openStream()));
multipartListing = client.listMultipartUploads(
new ListMultipartUploadsRequest(containerName));
assertThat(multipartListing.getMultipartUploads()).hasSize(1);
partListing = client.listParts(new ListPartsRequest(
containerName, blobName, result.getUploadId()));
assertThat(partListing.getParts()).hasSize(1);
client.abortMultipartUpload(new AbortMultipartUploadRequest(
containerName, blobName, result.getUploadId()));
multipartListing = client.listMultipartUploads(
new ListMultipartUploadsRequest(containerName));
if (blobStoreType.equals("azureblob")) {
// Azure does not support explicit abort. It automatically
// removes incomplete multi-part uploads after 7 days.
assertThat(multipartListing.getMultipartUploads()).hasSize(1);
} else {
assertThat(multipartListing.getMultipartUploads()).isEmpty();
}
ObjectListing listing = client.listObjects(containerName);
assertThat(listing.getObjectSummaries()).isEmpty();
}
protected void ageoffS3Uploads(final ProcessContext context, final AmazonS3Client s3, final long now, String bucket) {
MultipartUploadListing oldUploads = getS3AgeoffListAndAgeoffLocalState(context, s3, now, bucket);
for (MultipartUpload upload : oldUploads.getMultipartUploads()) {
abortS3MultipartUpload(s3, oldUploads.getBucketName(), upload);
}
}
protected MultipartUploadListing getS3AgeoffListAndAgeoffLocalState(final ProcessContext context, final AmazonS3Client s3, final long now, String bucket) {
final long ageoff_interval = context.getProperty(MULTIPART_S3_AGEOFF_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAge = context.getProperty(MULTIPART_S3_MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long ageCutoff = now - maxAge;
final List<MultipartUpload> ageoffList = new ArrayList<>();
if ((lastS3AgeOff.get() < now - ageoff_interval) && s3BucketLock.tryLock()) {
try {
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
for (MultipartUpload upload : listing.getMultipartUploads()) {
long uploadTime = upload.getInitiated().getTime();
if (uploadTime < ageCutoff) {
ageoffList.add(upload);
}
}
// ageoff any local state
ageoffLocalState(ageCutoff);
lastS3AgeOff.set(System.currentTimeMillis());
} catch(AmazonClientException e) {
if (e instanceof AmazonS3Exception
&& ((AmazonS3Exception)e).getStatusCode() == 403
&& ((AmazonS3Exception) e).getErrorCode().equals("AccessDenied")) {
getLogger().warn("AccessDenied checking S3 Multipart Upload list for {}: {} " +
"** The configured user does not have the s3:ListBucketMultipartUploads permission " +
"for this bucket, S3 ageoff cannot occur without this permission. Next ageoff check " +
"time is being advanced by interval to prevent checking on every upload **",
new Object[]{bucket, e.getMessage()});
lastS3AgeOff.set(System.currentTimeMillis());
} else {
getLogger().error("Error checking S3 Multipart Upload list for {}: {}",
new Object[]{bucket, e.getMessage()});
}
} finally {
s3BucketLock.unlock();
}
}
MultipartUploadListing result = new MultipartUploadListing();
result.setBucketName(bucket);
result.setMultipartUploads(ageoffList);
return result;
}
@Test
public void testLocalStatePersistence() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key;
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2";
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3";
/*
* store 3 versions of state
*/
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
processor.persistLocalState(cacheKey1, state1orig);
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
state2orig.setUploadId("1234");
state2orig.setContentLength(1234L);
processor.persistLocalState(cacheKey2, state2orig);
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
state3orig.setUploadId("5678");
state3orig.setContentLength(5678L);
processor.persistLocalState(cacheKey3, state3orig);
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key);
upload1.setUploadId("");
uploadList.add(upload1);
final MultipartUpload upload2 = new MultipartUpload();
upload2.setKey(key + "-v2");
upload2.setUploadId("1234");
uploadList.add(upload2);
final MultipartUpload upload3 = new MultipartUpload();
upload3.setKey(key + "-v3");
upload3.setUploadId("5678");
uploadList.add(upload3);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* reload and validate stored state
*/
final PutS3Object.MultipartState state1new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey1);
Assert.assertEquals("", state1new.getUploadId());
Assert.assertEquals(0L, state1new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state1new.getPartETags());
Assert.assertEquals(0L, state1new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state1new.getStorageClass());
Assert.assertEquals(0L, state1new.getContentLength().longValue());
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
Assert.assertEquals("1234", state2new.getUploadId());
Assert.assertEquals(0L, state2new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state2new.getPartETags());
Assert.assertEquals(0L, state2new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state2new.getStorageClass());
Assert.assertEquals(1234L, state2new.getContentLength().longValue());
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
Assert.assertEquals("5678", state3new.getUploadId());
Assert.assertEquals(0L, state3new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state3new.getPartETags());
Assert.assertEquals(0L, state3new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state3new.getStorageClass());
Assert.assertEquals(5678L, state3new.getContentLength().longValue());
}
@Test
public void testStatePersistsETags() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1";
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2";
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3";
/*
* store 3 versions of state
*/
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
processor.persistLocalState(cacheKey1, state1orig);
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
state2orig.setUploadId("1234");
state2orig.setContentLength(1234L);
processor.persistLocalState(cacheKey2, state2orig);
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
state3orig.setUploadId("5678");
state3orig.setContentLength(5678L);
processor.persistLocalState(cacheKey3, state3orig);
/*
* persist state to caches so that
* 1. v2 has 2 and then 4 tags
* 2. v3 has 4 and then 2 tags
*/
state2orig.getPartETags().add(new PartETag(1, "state 2 tag one"));
state2orig.getPartETags().add(new PartETag(2, "state 2 tag two"));
processor.persistLocalState(cacheKey2, state2orig);
state2orig.getPartETags().add(new PartETag(3, "state 2 tag three"));
state2orig.getPartETags().add(new PartETag(4, "state 2 tag four"));
processor.persistLocalState(cacheKey2, state2orig);
state3orig.getPartETags().add(new PartETag(1, "state 3 tag one"));
state3orig.getPartETags().add(new PartETag(2, "state 3 tag two"));
state3orig.getPartETags().add(new PartETag(3, "state 3 tag three"));
state3orig.getPartETags().add(new PartETag(4, "state 3 tag four"));
processor.persistLocalState(cacheKey3, state3orig);
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
processor.persistLocalState(cacheKey3, state3orig);
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key + "-bv2");
upload1.setUploadId("1234");
uploadList.add(upload1);
final MultipartUpload upload2 = new MultipartUpload();
upload2.setKey(key + "-bv3");
upload2.setUploadId("5678");
uploadList.add(upload2);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* load state and validate that
* 1. v2 restore shows 4 tags
* 2. v3 restore shows 2 tags
*/
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
Assert.assertEquals("1234", state2new.getUploadId());
Assert.assertEquals(4, state2new.getPartETags().size());
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
Assert.assertEquals("5678", state3new.getUploadId());
Assert.assertEquals(2, state3new.getPartETags().size());
}
@Test
public void testS3MultipartAgeoff() throws InterruptedException, IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final ProcessContext context = runner.getProcessContext();
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
// set check interval and age off to minimum values
runner.setProperty(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL, "1 milli");
runner.setProperty(PutS3Object.MULTIPART_S3_MAX_AGE, "1 milli");
// create some dummy uploads
for (Integer i = 0; i < 3; i++) {
final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(
BUCKET_NAME, "file" + i.toString() + ".txt");
try {
client.initiateMultipartUpload(initiateRequest);
} catch (AmazonClientException e) {
Assert.fail("Failed to initiate upload: " + e.getMessage());
}
}
// Age off is time dependent, so test has some timing constraints. This
// sleep() delays long enough to satisfy interval and age intervals.
Thread.sleep(2000L);
// System millis are used for timing, but it is incremented on each
// call to circumvent what appears to be caching in the AWS library.
// The increments are 1000 millis because AWS returns upload
// initiation times in whole seconds.
Long now = System.currentTimeMillis();
MultipartUploadListing uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now, BUCKET_NAME);
Assert.assertEquals(3, uploadList.getMultipartUploads().size());
MultipartUpload upload0 = uploadList.getMultipartUploads().get(0);
processor.abortS3MultipartUpload(client, BUCKET_NAME, upload0);
uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now+1000, BUCKET_NAME);
Assert.assertEquals(2, uploadList.getMultipartUploads().size());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test-upload.txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
runner.run();
uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now+2000, BUCKET_NAME);
Assert.assertEquals(0, uploadList.getMultipartUploads().size());
}
public void setListing(MultipartUploadListing newlisting) {
listing = newlisting;
}
@Override
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest)
throws AmazonClientException, AmazonServiceException {
return listing;
}
@Override
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest request) throws AmazonClientException,
AmazonServiceException {
// TODO Auto-generated method stub
return null;
}
/**
* Lists in-progress multipart uploads. An in-progress multipart upload is a multipart upload that has been initiated, but has not yet been completed or
* aborted. This operation returns at most 1,000 multipart uploads in the response by default.
*
* @param listMultipartUploadsRequest the request object that specifies all the parameters of this operation
* @param s3Client the {@link AmazonS3} implementation to use
*
* @return the listing of multipart uploads from Amazon S3
*/
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest, AmazonS3 s3Client);