下面列出了com.amazonaws.services.s3.model.GetObjectMetadataRequest#com.amazonaws.services.s3.model.ObjectListing 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void doStart() {
AWSCredentials myCredentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonS3 s3Client = new AmazonS3Client(myCredentials);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket);
ObjectListing objectListing = s3Client.listObjects(listObjectsRequest);
ChannelProcessor channelProcessor = getChannelProcessor();
for (S3ObjectSummary s3ObjectSummary : objectListing.getObjectSummaries()) {
String file = s3ObjectSummary.getKey();
LOGGER.info("Read the content of {}", file);
GetObjectRequest objectRequest = new GetObjectRequest(bucket, file);
S3Object objectPortion = s3Client.getObject(objectRequest);
try {
long startTime = System.currentTimeMillis();
processLines(channelProcessor, objectPortion.getObjectContent());
LOGGER.info("Processing of {} took {} ms", file, System.currentTimeMillis() - startTime);
} catch (IOException e) {
LOGGER.warn("Cannot process the {}, skipping", file, e);
}
}
}
private List<String> getFilenamesFromListing(ObjectListing listing, String prefix) {
List<String> results = new ArrayList<String>(100);
for (S3ObjectSummary summary : listing.getObjectSummaries()) {
final String key = summary.getKey();
final String filename;
filename = key.substring(prefix.length());
if (filename.length() == 0 || filename.contains(DELIMITER)) {
log.error("Error parsing S3 object Key. Key: " + key);
continue;
}
results.add(filename);
}
return results;
}
private void handleAttachExceptionFallback( String bucket, String keyWithDelimiter, AmazonS3Exception exception )
throws FileSystemException {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
.withBucketName( bucket )
.withPrefix( keyWithDelimiter )
.withDelimiter( DELIMITER );
ObjectListing ol = fileSystem.getS3Client().listObjects( listObjectsRequest );
if ( !( ol.getCommonPrefixes().isEmpty() && ol.getObjectSummaries().isEmpty() ) ) {
injectType( FileType.FOLDER );
} else {
//Folders don't really exist - they will generate a "NoSuchKey" exception
// confirms key doesn't exist but connection okay
String errorCode = exception.getErrorCode();
if ( !errorCode.equals( "NoSuchKey" ) ) {
// bubbling up other connection errors
logger.error( "Could not get information on " + getQualifiedName(),
exception ); // make sure this gets printed for the user
throw new FileSystemException( "vfs.provider/get-type.error", getQualifiedName(), exception );
}
}
}
protected void doDelete( String key, String bucketName ) throws FileSystemException {
// can only delete folder if empty
if ( getType() == FileType.FOLDER ) {
// list all children inside the folder
ObjectListing ol = fileSystem.getS3Client().listObjects( bucketName, key );
ArrayList<S3ObjectSummary> allSummaries = new ArrayList<>( ol.getObjectSummaries() );
// get full list
while ( ol.isTruncated() ) {
ol = fileSystem.getS3Client().listNextBatchOfObjects( ol );
allSummaries.addAll( ol.getObjectSummaries() );
}
for ( S3ObjectSummary s3os : allSummaries ) {
fileSystem.getS3Client().deleteObject( bucketName, s3os.getKey() );
}
}
fileSystem.getS3Client().deleteObject( bucketName, key );
}
@Override
public List<URI> listUris(URI uri, Predicate<URI> uriPredicate) throws IOException {
String bucketName = uri.getHost();
if (client == null) {
client = clientBuilder.client(uri);
}
String prefix = uri.getPath().length() > 1 ? uri.getPath().substring(1) : "";
List<URI> uris = new ArrayList<>();
ObjectListing list = client.listObjects(bucketName, prefix);
addKeyUris(uris, list, uri, uriPredicate);
while (list.isTruncated()) {
list = client.listNextBatchOfObjects(list);
addKeyUris(uris, list, uri, uriPredicate);
}
return uris;
}
@Test
public void testHandleAttachExceptionFileNotFound() throws FileSystemException {
AmazonS3Exception notFoundException = new AmazonS3Exception( "404 Not Found" );
notFoundException.setErrorCode( "404 Not Found" );
AmazonS3Exception noSuchKeyException = new AmazonS3Exception( "NoSuchKey" );
noSuchKeyException.setErrorCode( "NoSuchKey" );
//test the case where the file is not found; no exception should be thrown
when( s3ServiceMock.getObject( BUCKET_NAME, origKey ) ).thenThrow( notFoundException );
when( s3ServiceMock.getObject( BUCKET_NAME, origKey + "/" ) ).thenThrow( noSuchKeyException );
childObjectListing = mock( ObjectListing.class );
when( childObjectListing.getObjectSummaries() ).thenReturn( new ArrayList<>() );
when( childObjectListing.getCommonPrefixes() ).thenReturn( new ArrayList<>() );
when( s3ServiceMock.listObjects( any( ListObjectsRequest.class ) ) ).thenReturn( childObjectListing );
try {
s3FileObjectFileSpy.doAttach();
} catch ( Exception e ) {
fail( "Caught exception " + e.getMessage() );
}
assertEquals( FileType.IMAGINARY, s3FileObjectFileSpy.getType() );
}
private int doSize(final String namespace) {
final String bucket = toBucketName(namespace);
if (!this.s3Client.doesBucketExistV2(bucket)) {
return -1;
}
int totalSize = 0;
ObjectListing listing = this.s3Client.listObjects(bucket);
do {
totalSize += listing.getObjectSummaries().size();
logger.debug("got {} keys from {}", listing.getObjectSummaries().size(), listing);
listing = this.s3Client.listNextBatchOfObjects(listing);
} while (listing.isTruncated());
return totalSize;
}
private Collection<String> getKeys(final String bucket, final int start, final int count) {
final Set<String> keys = new HashSet<>();
int index = 0;
ObjectListing listing = this.s3Client.listObjects(bucket);
do {
for (final S3ObjectSummary summary : listing.getObjectSummaries()) {
if (index < start) {
logger.debug("skipping {} at index={} start={}", summary.getKey(), index++, start);
continue;
}
keys.add(summary.getKey());
if (keys.size() == count) {
logger.debug("retrieved {}/{} keys, returning early", keys.size(), count);
return keys;
}
}
logger.debug("got {} keys from {}", listing.getObjectSummaries().size(), listing);
listing = this.s3Client.listNextBatchOfObjects(listing);
} while (listing.isTruncated());
return keys;
}
@Override
public void consumeFiles(final RemoteObjectReference prefix, final Consumer<RemoteObjectReference> consumer) {
final Path bucketPath = Paths.get(request.storageLocation.clusterId).resolve(request.storageLocation.datacenterId).resolve(request.storageLocation.nodeId);
ObjectListing objectListing = amazonS3.listObjects(request.storageLocation.bucket, prefix.canonicalPath);
boolean hasMoreContent = true;
while (hasMoreContent) {
objectListing.getObjectSummaries().stream()
.filter(objectSummary -> !objectSummary.getKey().endsWith("/"))
.forEach(objectSummary -> consumer.accept(objectKeyToRemoteReference(bucketPath.relativize(Paths.get(objectSummary.getKey())))));
if (objectListing.isTruncated()) {
objectListing = amazonS3.listNextBatchOfObjects(objectListing);
} else {
hasMoreContent = false;
}
}
}
@Override
public void delete(final String bucketName) {
if (!doesExist(bucketName)) {
logger.info("Bucket was not deleted as it does not exist.");
return;
}
ObjectListing objectListing = transferManager.getAmazonS3Client().listObjects(bucketName);
delete(transferManager.getAmazonS3Client(), objectListing, bucketName);
while (objectListing.isTruncated()) {
objectListing = transferManager.getAmazonS3Client().listNextBatchOfObjects(objectListing);
delete(transferManager.getAmazonS3Client(), objectListing, bucketName);
}
transferManager.getAmazonS3Client().deleteBucket(bucketName);
}
@Test
public void testEventRecords() throws Exception {
String prefix = "testEventRecords";
AmazonS3Target amazonS3Target = createS3targetWithTextData(prefix, false, "");
TargetRunner targetRunner = new TargetRunner.Builder(AmazonS3DTarget.class, amazonS3Target)
.addService(DataFormatGeneratorService.class, new SdkJsonDataFormatGeneratorService())
.build();
targetRunner.runInit();
List<Record> logRecords = TestUtil.createStringRecords(BUCKET_NAME);
//Make sure the prefix is empty
ObjectListing objectListing = s3client.listObjects(BUCKET_NAME, prefix);
Assert.assertTrue(objectListing.getObjectSummaries().isEmpty());
targetRunner.runWrite(logRecords);
Assert.assertEquals(1, targetRunner.getEventRecords().size());
Record eventRecord = targetRunner.getEventRecords().get(0);
Assert.assertTrue(eventRecord.has("/bucket"));
Assert.assertTrue(eventRecord.has("/objectKey"));
Assert.assertEquals(BUCKET_NAME, eventRecord.get("/bucket").getValueAsString());
targetRunner.runDestroy();
}
@Override
protected Response serve() {
String bucket = _bucket.value();
Log.info("ImportS3 processing (" + bucket + ")");
JsonObject json = new JsonObject();
JsonArray succ = new JsonArray();
JsonArray fail = new JsonArray();
AmazonS3 s3 = PersistS3.getClient();
ObjectListing currentList = s3.listObjects(bucket);
processListing(currentList, succ, fail);
while(currentList.isTruncated()){
currentList = s3.listNextBatchOfObjects(currentList);
processListing(currentList, succ, fail);
}
json.add(NUM_SUCCEEDED, new JsonPrimitive(succ.size()));
json.add(SUCCEEDED, succ);
json.add(NUM_FAILED, new JsonPrimitive(fail.size()));
json.add(FAILED, fail);
DKV.write_barrier();
Response r = Response.done(json);
r.setBuilder(SUCCEEDED + "." + KEY, new KeyCellBuilder());
return r;
}
@Override
public ObjectListing listObjects(final ListObjectsRequest request) throws AmazonClientException {
assertThat(request.getBucketName(), equalTo(bucket));
final ObjectListing listing = new ObjectListing();
listing.setBucketName(request.getBucketName());
listing.setPrefix(request.getPrefix());
for (Map.Entry<String, byte[]> blob : blobs.entrySet()) {
if (Strings.isEmpty(request.getPrefix()) || blob.getKey().startsWith(request.getPrefix())) {
S3ObjectSummary summary = new S3ObjectSummary();
summary.setBucketName(request.getBucketName());
summary.setKey(blob.getKey());
summary.setSize(blob.getValue().length);
listing.getObjectSummaries().add(summary);
}
}
return listing;
}
public void delete(String bucketName, String pathPrefix) {
ObjectListing listing = null;
do {
listing = listing == null ? helper.client().listObjects(bucketName, pathPrefix) : helper.client().listNextBatchOfObjects(listing);
DeleteObjectsRequest req = new DeleteObjectsRequest(bucketName);
List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>(listing.getObjectSummaries().size());
for (S3ObjectSummary summary : listing.getObjectSummaries()) {
keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
}
req.withKeys(keys);
helper.client().deleteObjects(req);
} while (listing.isTruncated());
}
public List<InfectedFile> getFiles() {
final List<InfectedFile> files = new ArrayList<>();
if (Config.has(Config.Key.INFECTED_FILES_BUCKET_NAME)) {
final AmazonS3 s3local = AmazonS3ClientBuilder.standard().withCredentials(this.credentialsProvider).withRegion(Config.get(Config.Key.INFECTED_FILES_BUCKET_REGION)).build();
ObjectListing objectListing = s3local.listObjects(Config.get(Config.Key.INFECTED_FILES_BUCKET_NAME));
while (true) {
objectListing.getObjectSummaries().forEach((summary) -> {
final S3Object object = s3local.getObject(summary.getBucketName(), summary.getKey());
final byte[] content;
try {
content = IOUtils.toByteArray(object.getObjectContent());
} catch (final IOException e) {
throw new RuntimeException(e);
}
files.add(new InfectedFile(summary.getKey(), content, object.getObjectMetadata().getContentType()));
});
if (objectListing.isTruncated()) {
objectListing = s3local.listNextBatchOfObjects(objectListing);
} else {
break;
}
}
}
return files;
}
private Answer<ObjectListing> objectListingAnswer(@Nullable final String marker, final String... fileNames) {
return new Answer<ObjectListing>() {
@Override
public ObjectListing answer(InvocationOnMock invocation)
throws Throwable {
ListObjectsRequest request = (ListObjectsRequest) invocation.getArguments()[0];
ObjectListing objectListing = new ObjectListing();
objectListing.setBucketName(request.getBucketName());
objectListing.setPrefix(request.getPrefix());
objectListing.setTruncated(marker != null);
objectListing.setNextMarker(marker);
for (String fileName : fileNames) {
S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setKey(request.getPrefix() + fileName);
objectSummary.setSize(100);
objectListing.getObjectSummaries().add(objectSummary);
}
return objectListing;
}
};
}
@Override
public void deleteByTenant(final String tenant) {
final String folder = sanitizeTenant(tenant);
LOG.info("Deleting S3 object folder (tenant) from bucket {} and key {}", s3Properties.getBucketName(), folder);
// Delete artifacts
ObjectListing objects = amazonS3.listObjects(s3Properties.getBucketName(), folder + "/");
do {
for (final S3ObjectSummary objectSummary : objects.getObjectSummaries()) {
amazonS3.deleteObject(s3Properties.getBucketName(), objectSummary.getKey());
}
objects = amazonS3.listNextBatchOfObjects(objects);
} while (objects.isTruncated());
}
public static void emptyAndDeleteBucket(AmazonS3Client client,
String bucketName) {
ObjectListing objectListing = client.listObjects(bucketName);
while (true) {
for (Iterator<?> iterator = objectListing.getObjectSummaries()
.iterator(); iterator.hasNext();) {
S3ObjectSummary objectSummary = (S3ObjectSummary) iterator
.next();
client.deleteObject(bucketName, objectSummary.getKey());
}
if (objectListing.isTruncated()) {
objectListing = client.listNextBatchOfObjects(objectListing);
} else {
break;
}
}
client.deleteBucket(bucketName);
}
@Override
protected List<String> getFileNames(String lockPrefix) throws Exception
{
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
request.setPrefix(lockPrefix);
ObjectListing objectListing = client.listObjects(request);
return Lists.transform
(
objectListing.getObjectSummaries(),
new Function<S3ObjectSummary, String>()
{
@Override
public String apply(S3ObjectSummary summary)
{
return summary.getKey();
}
}
);
}
@Override
public void deleteStorageLocation(final BlobStoreConfiguration blobStoreConfiguration) {
String bucket = getConfiguredBucket(blobStoreConfiguration);
ObjectListing listing = s3.listObjects(new ListObjectsRequest().withBucketName(bucket).withMaxKeys(1));
if (listing.getObjectSummaries().isEmpty()) {
s3.deleteBucket(bucket);
}
else {
log.info("Not removing S3 bucket {} because it is not empty", bucket);
BucketLifecycleConfiguration lifecycleConfiguration = s3.getBucketLifecycleConfiguration(bucket);
List<Rule> nonBlobstoreRules = nonBlobstoreRules(lifecycleConfiguration, blobStoreConfiguration.getName());
if(!isEmpty(nonBlobstoreRules)) {
lifecycleConfiguration.setRules(nonBlobstoreRules);
s3.setBucketLifecycleConfiguration(bucket, lifecycleConfiguration);
} else {
s3.deleteBucketLifecycleConfiguration(bucket);
}
}
}
@Test
public void testUploadManyFiles() throws Exception {
init("s3.default.properties");
for (int i = 0; i < 8; i++) {
sink.append(new Record(Integer.toString(i), Integer.toString(i).getBytes()));
}
List<String> md5s = new ArrayList<>();
for (String file : sink.getBuffer().getFiles()) {
md5s.add(new String(Md5Utils.md5AsBase64(new File(file))));
}
sink.prepareCommit();
sink.commit();
ObjectListing list = s3.listObjects(s3SinkConfig.getBucketName(), "10");
assertTrue(list.getObjectSummaries().size() == 4);
for (int i = 0; i < 8; i += 2) {
S3Object s3object = s3.getObject(s3SinkConfig.getBucketName(), "10/" + i + "_" + (i + 1));
assertNotNull(s3object);
}
}
@Override
public void remove(String noteId, AuthenticationInfo subject) throws IOException {
String key = user + "/" + "notebook" + "/" + noteId;
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
.withBucketName(bucketName).withPrefix(key);
try {
ObjectListing objects = s3client.listObjects(listObjectsRequest);
do {
for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) {
s3client.deleteObject(bucketName, objectSummary.getKey());
}
objects = s3client.listNextBatchOfObjects(objects);
} while (objects.isTruncated());
}
catch (AmazonClientException ace) {
throw new IOException("Unable to remove note in S3: " + ace, ace);
}
}
@Override
public void remove(String folderPath, AuthenticationInfo subject) throws IOException {
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
.withBucketName(bucketName).withPrefix(rootFolder + folderPath + "/");
try {
ObjectListing objects = s3client.listObjects(listObjectsRequest);
do {
for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) {
s3client.deleteObject(bucketName, objectSummary.getKey());
}
objects = s3client.listNextBatchOfObjects(objects);
} while (objects.isTruncated());
} catch (AmazonClientException ace) {
throw new IOException("Unable to remove folder " + folderPath + " in S3", ace);
}
}
protected void validateTdSparkQueryOutput()
{
AmazonS3URI resultUri = new AmazonS3URI(tmpS3FolderUri.toString() + "/result/");
ObjectListing resultListing = s3.listObjects(new ListObjectsRequest().withBucketName(resultUri.getBucket()).withPrefix(resultUri.getKey()));
List<String> resultLines = resultListing.getObjectSummaries().stream().flatMap(o -> {
try (S3Object object = s3.getObject(o.getBucketName(), o.getKey())) {
return CharStreams.readLines(new InputStreamReader(object.getObjectContent())).stream();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}).collect(toList());
// FIXME
// we need to specify the version of td-spark that we can use for acceptance tests.
// In the meantime the assertion is commented out.
//assertThat(resultLines, Matchers.hasItem(",164.54.104.106,/item/games/4663,/category/electronics,404,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0),121,GET,1412383598"));
}
@Override
public ObjectListing listObjects(S3URI s3uri) {
Optional<ObjectListing> objectListing = objectListingCache.getIfPresent(s3uri);
if (objectListing == null) {
logger.debug(String.format("ObjectListing cache MISS: '%s'", s3uri));
objectListing = Optional.fromNullable(threddsS3Client.listObjects(s3uri));
objectListingCache.put(s3uri, objectListing);
} else {
logger.debug(String.format("ObjectListing cache hit: '%s'", s3uri));
}
return objectListing.orNull();
}
/**
* Call S3 to get the most recent object list.
* <p>
* This is an object list request to AWS in the given "directory".
*/
private List<S3ObjectSummary> getObjectSummaries()
{
AmazonS3Client s3client = clientManager.getS3Client();
AmazonS3URI directoryURI = new AmazonS3URI(bucketUrl.get());
List<S3ObjectSummary> result = new ArrayList<>();
try {
log.info("Getting the listing of objects in the S3 table config directory: bucket %s prefix %s :", directoryURI.getBucket(), directoryURI.getKey());
ListObjectsRequest request = new ListObjectsRequest()
.withBucketName(directoryURI.getBucket())
.withPrefix(directoryURI.getKey() + "/")
.withDelimiter("/")
.withMaxKeys(25);
ObjectListing response;
do {
response = s3client.listObjects(request);
result.addAll(response.getObjectSummaries());
request.setMarker(response.getNextMarker());
}
while (response.isTruncated());
log.info("Completed getting S3 object listing.");
}
catch (AmazonClientException e) {
log.error("Skipping update as faced error fetching table descriptions from S3 " + e.toString());
}
return result;
}
@Override
public void handleAttachException( String key, String bucket ) throws FileSystemException {
SimpleEntry<String, String> newPath = fixFilePath( key, bucket );
String keyWithDelimiter = newPath.getKey() + DELIMITER;
try {
s3Object = getS3Object( keyWithDelimiter, newPath.getValue() );
s3ObjectMetadata = s3Object.getObjectMetadata();
injectType( FileType.FOLDER );
} catch ( AmazonS3Exception e2 ) {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
.withBucketName( newPath.getValue() )
.withPrefix( keyWithDelimiter )
.withDelimiter( DELIMITER );
ObjectListing ol = fileSystem.getS3Client().listObjects( listObjectsRequest );
if ( !( ol.getCommonPrefixes().isEmpty() && ol.getObjectSummaries().isEmpty() ) ) {
injectType( FileType.FOLDER );
} else {
//Folders don't really exist - they will generate a "NoSuchKey" exception
String errorCode = e2.getErrorCode();
// confirms key doesn't exist but connection okay
if ( !errorCode.equals( "NoSuchKey" ) ) {
// bubbling up other connection errors
logger.error( "Could not get information on " + getQualifiedName(),
e2 ); // make sure this gets printed for the user
throw new FileSystemException( "vfs.provider/get-type.error", getQualifiedName(), e2 );
}
}
}
}
private void addKeyUris(List<URI> uris, ObjectListing list, URI uri, Predicate<URI> uriPredicate) {
List<S3ObjectSummary> summaries = list.getObjectSummaries();
for (S3ObjectSummary summary : summaries) {
String key = summary.getKey();
if (!key.endsWith("/")) {
URI keyUri = uri.resolve("/" + key);
if (uriPredicate.test(keyUri)) {
uris.add(keyUri);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{}", keyUri);
}
}
}
}
}
@Test
public void shouldReturnNullWhenObjectListingIsSize0() {
ObjectListing listing = new ObjectListing();
doReturn(listing).when(mockClient).listObjects(any(ListObjectsRequest.class));
S3ArtifactStore store = new S3ArtifactStore(mockClient, "foo-bar");
String prefix = store.getLatestPrefix("pipeline", "stage", "job", "1");
assertNull(prefix);
}
@Override
public boolean folderExists(String folder) throws FileSystemException {
ObjectListing objectListing = s3Client.listObjects(bucketName);
Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator();
while (objIter.hasNext()) {
S3ObjectSummary s3ObjectSummary = objIter.next();
String key = s3ObjectSummary.getKey();
if(key.endsWith("/") && key.equals(folder+"/")){
return true;
}
}
return false;
}