下面列出了com.amazonaws.services.s3.model.VersionListing#getVersionSummaries ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private boolean deleteObject(final String bucket, final String key) {
this.s3Client.deleteObject(bucket, key);
final VersionListing versionList = this.s3Client.listVersions(bucket, key);
for (final S3VersionSummary summary : versionList.getVersionSummaries()) {
logger.debug("deleting version {}", summary.getKey());
this.s3Client.deleteVersion(bucket, summary.getKey(), summary.getVersionId());
}
return true;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
try {
restoreState(context);
} catch (IOException ioe) {
getLogger().error("Failed to restore processor state; yielding", ioe);
context.yield();
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final AmazonS3 client = getClient();
int listCount = 0;
long maxTimestamp = 0L;
String delimiter = context.getProperty(DELIMITER).getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
S3BucketLister bucketLister = useVersions
? new S3VersionBucketLister(client)
: new S3ObjectBucketLister(client);
bucketLister.setBucketName(bucket);
if (delimiter != null && !delimiter.isEmpty()) {
bucketLister.setDelimiter(delimiter);
}
if (prefix != null && !prefix.isEmpty()) {
bucketLister.setPrefix(prefix);
}
VersionListing versionListing;
do {
versionListing = bucketLister.listVersions();
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) {
continue;
}
// Create the attributes
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
attributes.put("s3.bucket", versionSummary.getBucketName());
if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
attributes.put("s3.owner", versionSummary.getOwner().getId());
}
attributes.put("s3.etag", versionSummary.getETag());
attributes.put("s3.lastModified", String.valueOf(lastModified));
attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
attributes.put("s3.storeClass", versionSummary.getStorageClass());
attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
if (versionSummary.getVersionId() != null) {
attributes.put("s3.version", versionSummary.getVersionId());
}
// Create the flowfile
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
// Update state
if (lastModified > maxTimestamp) {
maxTimestamp = lastModified;
currentKeys.clear();
}
if (lastModified == maxTimestamp) {
currentKeys.add(versionSummary.getKey());
}
listCount++;
}
bucketLister.setNextMarker();
commit(context, session, listCount);
listCount = 0;
} while (bucketLister.isTruncated());
currentTimestamp = maxTimestamp;
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
if (!commit(context, session, listCount)) {
if (currentTimestamp > 0) {
persistState(context);
}
getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
context.yield();
}
}
private void delete(VersionListing versionListing) {
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
s3.deleteVersion(versionSummary.getBucketName(), versionSummary.getKey(), versionSummary.getVersionId());
}
}