下面列出了com.amazonaws.services.s3.model.MultipartUploadListing#setMultipartUploads ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testStateRemove() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr";
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key);
upload1.setUploadId("1234");
uploadList.add(upload1);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* store state, retrieve and validate, remove and validate
*/
PutS3Object.MultipartState stateOrig = new PutS3Object.MultipartState();
stateOrig.setUploadId("1234");
stateOrig.setContentLength(1234L);
processor.persistLocalState(cacheKey, stateOrig);
PutS3Object.MultipartState state1 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
Assert.assertEquals("1234", state1.getUploadId());
Assert.assertEquals(1234L, state1.getContentLength().longValue());
processor.persistLocalState(cacheKey, null);
PutS3Object.MultipartState state2 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
Assert.assertNull(state2);
}
/**
* <p> Returns a mock {@link MultipartUploadListing}. </p> <p> The return object has the following properties. <dl> <dt>multipartUploads</dt> <dd>Length 3
* list</dd> <p/> <dt>multipartUploads[0].initiated</dt> <dd>5 minutes prior to the object creation time.</dd> <p/> <dt>multipartUploads[1].initiated</dt>
* <dd>15 minutes prior to the object creation time.</dd> <p/> <dt>multipartUploads[2].initiated</dt> <dd>20 minutes prior to the object creation time.</dd>
* </dl> <p/> All other properties as set to default as defined in the by {@link MultipartUploadListing} constructor. </p>
*
* @return a mock object
*/
private MultipartUploadListing getMultipartUploadListing()
{
// Return 3 multipart uploads with 2 of them started more than 10 minutes ago.
MultipartUploadListing multipartUploadListing = new MultipartUploadListing();
List<MultipartUpload> multipartUploads = new ArrayList<>();
multipartUploadListing.setMultipartUploads(multipartUploads);
Date now = new Date();
multipartUploads.add(getMultipartUpload(HerdDateUtils.addMinutes(now, -5)));
multipartUploads.add(getMultipartUpload(HerdDateUtils.addMinutes(now, -15)));
multipartUploads.add(getMultipartUpload(HerdDateUtils.addMinutes(now, -20)));
return multipartUploadListing;
}
@Test
public void testStateRemove() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr";
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key);
upload1.setUploadId("1234");
uploadList.add(upload1);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* store state, retrieve and validate, remove and validate
*/
PutS3Object.MultipartState stateOrig = new PutS3Object.MultipartState();
stateOrig.setUploadId("1234");
stateOrig.setContentLength(1234L);
processor.persistLocalState(cacheKey, stateOrig);
PutS3Object.MultipartState state1 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
Assert.assertEquals("1234", state1.getUploadId());
Assert.assertEquals(1234L, state1.getContentLength().longValue());
processor.persistLocalState(cacheKey, null);
PutS3Object.MultipartState state2 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
Assert.assertNull(state2);
}
protected MultipartUploadListing getS3AgeoffListAndAgeoffLocalState(final ProcessContext context, final AmazonS3Client s3, final long now) {
final long ageoff_interval = context.getProperty(MULTIPART_S3_AGEOFF_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final Long maxAge = context.getProperty(MULTIPART_S3_MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long ageCutoff = now - maxAge;
final List<MultipartUpload> ageoffList = new ArrayList<>();
if ((lastS3AgeOff.get() < now - ageoff_interval) && s3BucketLock.tryLock()) {
try {
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
for (MultipartUpload upload : listing.getMultipartUploads()) {
long uploadTime = upload.getInitiated().getTime();
if (uploadTime < ageCutoff) {
ageoffList.add(upload);
}
}
// ageoff any local state
ageoffLocalState(ageCutoff);
lastS3AgeOff.set(System.currentTimeMillis());
} catch(AmazonClientException e) {
if (e instanceof AmazonS3Exception
&& ((AmazonS3Exception)e).getStatusCode() == 403
&& ((AmazonS3Exception) e).getErrorCode().equals("AccessDenied")) {
getLogger().warn("AccessDenied checking S3 Multipart Upload list for {}: {} " +
"** The configured user does not have the s3:ListBucketMultipartUploads permission " +
"for this bucket, S3 ageoff cannot occur without this permission. Next ageoff check " +
"time is being advanced by interval to prevent checking on every upload **",
new Object[]{bucket, e.getMessage()});
lastS3AgeOff.set(System.currentTimeMillis());
} else {
getLogger().error("Error checking S3 Multipart Upload list for {}: {}",
new Object[]{bucket, e.getMessage()});
}
} finally {
s3BucketLock.unlock();
}
}
MultipartUploadListing result = new MultipartUploadListing();
result.setBucketName(bucket);
result.setMultipartUploads(ageoffList);
return result;
}
@Test
public void testLocalStatePersistence() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key;
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2";
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3";
/*
* store 3 versions of state
*/
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
processor.persistLocalState(cacheKey1, state1orig);
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
state2orig.setUploadId("1234");
state2orig.setContentLength(1234L);
processor.persistLocalState(cacheKey2, state2orig);
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
state3orig.setUploadId("5678");
state3orig.setContentLength(5678L);
processor.persistLocalState(cacheKey3, state3orig);
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key);
upload1.setUploadId("");
uploadList.add(upload1);
final MultipartUpload upload2 = new MultipartUpload();
upload2.setKey(key + "-v2");
upload2.setUploadId("1234");
uploadList.add(upload2);
final MultipartUpload upload3 = new MultipartUpload();
upload3.setKey(key + "-v3");
upload3.setUploadId("5678");
uploadList.add(upload3);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* reload and validate stored state
*/
final PutS3Object.MultipartState state1new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey1);
Assert.assertEquals("", state1new.getUploadId());
Assert.assertEquals(0L, state1new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state1new.getPartETags());
Assert.assertEquals(0L, state1new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state1new.getStorageClass());
Assert.assertEquals(0L, state1new.getContentLength().longValue());
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
Assert.assertEquals("1234", state2new.getUploadId());
Assert.assertEquals(0L, state2new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state2new.getPartETags());
Assert.assertEquals(0L, state2new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state2new.getStorageClass());
Assert.assertEquals(1234L, state2new.getContentLength().longValue());
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
Assert.assertEquals("5678", state3new.getUploadId());
Assert.assertEquals(0L, state3new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state3new.getPartETags());
Assert.assertEquals(0L, state3new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state3new.getStorageClass());
Assert.assertEquals(5678L, state3new.getContentLength().longValue());
}
@Test
public void testStatePersistsETags() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1";
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2";
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3";
/*
* store 3 versions of state
*/
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
processor.persistLocalState(cacheKey1, state1orig);
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
state2orig.setUploadId("1234");
state2orig.setContentLength(1234L);
processor.persistLocalState(cacheKey2, state2orig);
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
state3orig.setUploadId("5678");
state3orig.setContentLength(5678L);
processor.persistLocalState(cacheKey3, state3orig);
/*
* persist state to caches so that
* 1. v2 has 2 and then 4 tags
* 2. v3 has 4 and then 2 tags
*/
state2orig.getPartETags().add(new PartETag(1, "state 2 tag one"));
state2orig.getPartETags().add(new PartETag(2, "state 2 tag two"));
processor.persistLocalState(cacheKey2, state2orig);
state2orig.getPartETags().add(new PartETag(3, "state 2 tag three"));
state2orig.getPartETags().add(new PartETag(4, "state 2 tag four"));
processor.persistLocalState(cacheKey2, state2orig);
state3orig.getPartETags().add(new PartETag(1, "state 3 tag one"));
state3orig.getPartETags().add(new PartETag(2, "state 3 tag two"));
state3orig.getPartETags().add(new PartETag(3, "state 3 tag three"));
state3orig.getPartETags().add(new PartETag(4, "state 3 tag four"));
processor.persistLocalState(cacheKey3, state3orig);
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
processor.persistLocalState(cacheKey3, state3orig);
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key + "-bv2");
upload1.setUploadId("1234");
uploadList.add(upload1);
final MultipartUpload upload2 = new MultipartUpload();
upload2.setKey(key + "-bv3");
upload2.setUploadId("5678");
uploadList.add(upload2);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* load state and validate that
* 1. v2 restore shows 4 tags
* 2. v3 restore shows 2 tags
*/
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
Assert.assertEquals("1234", state2new.getUploadId());
Assert.assertEquals(4, state2new.getPartETags().size());
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
Assert.assertEquals("5678", state3new.getUploadId());
Assert.assertEquals(2, state3new.getPartETags().size());
}
protected MultipartUploadListing getS3AgeoffListAndAgeoffLocalState(final ProcessContext context, final AmazonS3Client s3, final long now, String bucket) {
final long ageoff_interval = context.getProperty(MULTIPART_S3_AGEOFF_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAge = context.getProperty(MULTIPART_S3_MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long ageCutoff = now - maxAge;
final List<MultipartUpload> ageoffList = new ArrayList<>();
if ((lastS3AgeOff.get() < now - ageoff_interval) && s3BucketLock.tryLock()) {
try {
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
for (MultipartUpload upload : listing.getMultipartUploads()) {
long uploadTime = upload.getInitiated().getTime();
if (uploadTime < ageCutoff) {
ageoffList.add(upload);
}
}
// ageoff any local state
ageoffLocalState(ageCutoff);
lastS3AgeOff.set(System.currentTimeMillis());
} catch(AmazonClientException e) {
if (e instanceof AmazonS3Exception
&& ((AmazonS3Exception)e).getStatusCode() == 403
&& ((AmazonS3Exception) e).getErrorCode().equals("AccessDenied")) {
getLogger().warn("AccessDenied checking S3 Multipart Upload list for {}: {} " +
"** The configured user does not have the s3:ListBucketMultipartUploads permission " +
"for this bucket, S3 ageoff cannot occur without this permission. Next ageoff check " +
"time is being advanced by interval to prevent checking on every upload **",
new Object[]{bucket, e.getMessage()});
lastS3AgeOff.set(System.currentTimeMillis());
} else {
getLogger().error("Error checking S3 Multipart Upload list for {}: {}",
new Object[]{bucket, e.getMessage()});
}
} finally {
s3BucketLock.unlock();
}
}
MultipartUploadListing result = new MultipartUploadListing();
result.setBucketName(bucket);
result.setMultipartUploads(ageoffList);
return result;
}
@Test
public void testLocalStatePersistence() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key;
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2";
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3";
/*
* store 3 versions of state
*/
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
processor.persistLocalState(cacheKey1, state1orig);
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
state2orig.setUploadId("1234");
state2orig.setContentLength(1234L);
processor.persistLocalState(cacheKey2, state2orig);
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
state3orig.setUploadId("5678");
state3orig.setContentLength(5678L);
processor.persistLocalState(cacheKey3, state3orig);
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key);
upload1.setUploadId("");
uploadList.add(upload1);
final MultipartUpload upload2 = new MultipartUpload();
upload2.setKey(key + "-v2");
upload2.setUploadId("1234");
uploadList.add(upload2);
final MultipartUpload upload3 = new MultipartUpload();
upload3.setKey(key + "-v3");
upload3.setUploadId("5678");
uploadList.add(upload3);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* reload and validate stored state
*/
final PutS3Object.MultipartState state1new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey1);
Assert.assertEquals("", state1new.getUploadId());
Assert.assertEquals(0L, state1new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state1new.getPartETags());
Assert.assertEquals(0L, state1new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state1new.getStorageClass());
Assert.assertEquals(0L, state1new.getContentLength().longValue());
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
Assert.assertEquals("1234", state2new.getUploadId());
Assert.assertEquals(0L, state2new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state2new.getPartETags());
Assert.assertEquals(0L, state2new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state2new.getStorageClass());
Assert.assertEquals(1234L, state2new.getContentLength().longValue());
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
Assert.assertEquals("5678", state3new.getUploadId());
Assert.assertEquals(0L, state3new.getFilePosition().longValue());
Assert.assertEquals(new ArrayList<PartETag>(), state3new.getPartETags());
Assert.assertEquals(0L, state3new.getPartSize().longValue());
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state3new.getStorageClass());
Assert.assertEquals(5678L, state3new.getContentLength().longValue());
}
@Test
public void testStatePersistsETags() throws IOException {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1";
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2";
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3";
/*
* store 3 versions of state
*/
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
processor.persistLocalState(cacheKey1, state1orig);
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
state2orig.setUploadId("1234");
state2orig.setContentLength(1234L);
processor.persistLocalState(cacheKey2, state2orig);
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
state3orig.setUploadId("5678");
state3orig.setContentLength(5678L);
processor.persistLocalState(cacheKey3, state3orig);
/*
* persist state to caches so that
* 1. v2 has 2 and then 4 tags
* 2. v3 has 4 and then 2 tags
*/
state2orig.getPartETags().add(new PartETag(1, "state 2 tag one"));
state2orig.getPartETags().add(new PartETag(2, "state 2 tag two"));
processor.persistLocalState(cacheKey2, state2orig);
state2orig.getPartETags().add(new PartETag(3, "state 2 tag three"));
state2orig.getPartETags().add(new PartETag(4, "state 2 tag four"));
processor.persistLocalState(cacheKey2, state2orig);
state3orig.getPartETags().add(new PartETag(1, "state 3 tag one"));
state3orig.getPartETags().add(new PartETag(2, "state 3 tag two"));
state3orig.getPartETags().add(new PartETag(3, "state 3 tag three"));
state3orig.getPartETags().add(new PartETag(4, "state 3 tag four"));
processor.persistLocalState(cacheKey3, state3orig);
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
processor.persistLocalState(cacheKey3, state3orig);
final List<MultipartUpload> uploadList = new ArrayList<>();
final MultipartUpload upload1 = new MultipartUpload();
upload1.setKey(key + "-bv2");
upload1.setUploadId("1234");
uploadList.add(upload1);
final MultipartUpload upload2 = new MultipartUpload();
upload2.setKey(key + "-bv3");
upload2.setUploadId("5678");
uploadList.add(upload2);
final MultipartUploadListing uploadListing = new MultipartUploadListing();
uploadListing.setMultipartUploads(uploadList);
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
mockClient.setListing(uploadListing);
/*
* load state and validate that
* 1. v2 restore shows 4 tags
* 2. v3 restore shows 2 tags
*/
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
Assert.assertEquals("1234", state2new.getUploadId());
Assert.assertEquals(4, state2new.getPartETags().size());
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
Assert.assertEquals("5678", state3new.getUploadId());
Assert.assertEquals(2, state3new.getPartETags().size());
}