下面列出了怎么用com.amazonaws.services.s3.model.ListObjectsV2Request的API类实例代码及写法,或者点击链接到github查看源代码。
private void clearDest(AWSCredentials credentials, RedshiftConnection.UnloadConfig unloadConfig)
{
try {
RetryExecutor.retryExecutor() .run(() -> {
AmazonS3Client s3Client = new AmazonS3Client(credentials);
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(unloadConfig.s3Bucket).withPrefix(unloadConfig.s3Prefix);
ListObjectsV2Result result;
// This operation shouldn't be skipped since remaining files created by other operation can cause duplicated data
do {
result = s3Client.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
s3Client.deleteObject(unloadConfig.s3Bucket, objectSummary.getKey());
}
req.setContinuationToken(result.getNextContinuationToken());
} while (result.isTruncated());
});
}
catch (RetryExecutor.RetryGiveupException e) {
throw Throwables.propagate(e);
}
}
/**
* Calls DescribeDBInstances on the AWS RDS Client returning all DB Instances that match the supplied predicate and attempting
* to push down certain predicates (namely queries for specific DB Instance) to EC2.
*
* @See TableProvider
*/
@Override
public void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker)
{
ValueSet bucketConstraint = recordsRequest.getConstraints().getSummary().get("bucket_name");
String bucket;
if (bucketConstraint != null && bucketConstraint.isSingleValue()) {
bucket = bucketConstraint.getSingleValue().toString();
}
else {
throw new IllegalArgumentException("Queries against the objects table must filter on a single bucket " +
"(e.g. where bucket_name='my_bucket'.");
}
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucket).withMaxKeys(MAX_KEYS);
ListObjectsV2Result result;
do {
result = amazonS3.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
toRow(objectSummary, spiller);
}
req.setContinuationToken(result.getNextContinuationToken());
}
while (result.isTruncated() && queryStatusChecker.isQueryRunning());
}
private Result getResult() {
return _amazonS3Provider.getS3BucketNamesToS3Clients().entrySet().parallelStream()
.map(entry -> {
try {
entry.getValue().listObjectsV2(new ListObjectsV2Request()
.withBucketName(entry.getKey())
.withMaxKeys(1));
return Result.healthy("Bucket " + entry.getKey() + " is healthy");
} catch (Exception e) {
return Result.unhealthy("Bucket " + entry.getKey() + " is unhealthy: " + e.getMessage());
}
}).reduce((result1, result2) -> {
String message = result1.getMessage() + ", " + result2.getMessage();
if (result1.isHealthy() && result2.isHealthy()) {
return Result.healthy(message);
} else {
return Result.unhealthy(message);
}
})
.orElse(Result.healthy("No s3 buckets configured"));
}
protected Collection<String> getSiteListFromBucketKeys(String bucketName, String rootPrefix) {
List<String> siteNames = new ArrayList<>();
AmazonS3 client = clientBuilder.getClient();
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(bucketName)
.withPrefix(rootPrefix)
.withDelimiter(DELIMITER);
ListObjectsV2Result result = client.listObjectsV2(request);
if(CollectionUtils.isNotEmpty(result.getCommonPrefixes())) {
result.getCommonPrefixes()
.stream()
.map(prefix -> StringUtils.stripEnd(StringUtils.removeStart(prefix, rootPrefix), DELIMITER))
.forEach(siteNames::add);
}
return siteNames;
}
@Override
protected void setUpRead()
{
AtomicLong count = new AtomicLong(0);
when(mockS3.listObjectsV2(any(ListObjectsV2Request.class))).thenAnswer((InvocationOnMock invocation) -> {
ListObjectsV2Request request = (ListObjectsV2Request) invocation.getArguments()[0];
assertEquals(getIdValue(), request.getBucketName());
ListObjectsV2Result mockResult = mock(ListObjectsV2Result.class);
List<S3ObjectSummary> values = new ArrayList<>();
values.add(makeObjectSummary(getIdValue()));
values.add(makeObjectSummary(getIdValue()));
values.add(makeObjectSummary("fake-id"));
when(mockResult.getObjectSummaries()).thenReturn(values);
if (count.get() > 0) {
assertNotNull(request.getContinuationToken());
}
if (count.incrementAndGet() < 2) {
when(mockResult.isTruncated()).thenReturn(true);
when(mockResult.getNextContinuationToken()).thenReturn("token");
}
return mockResult;
});
}
private Iterator<LocatedFileStatus> listPrefix(Path path)
{
String key = keyFromPath(path);
if (!key.isEmpty()) {
key += PATH_SEPARATOR;
}
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(getBucketName(uri))
.withPrefix(key)
.withDelimiter(PATH_SEPARATOR)
.withRequesterPays(requesterPaysEnabled);
STATS.newListObjectsCall();
Iterator<ListObjectsV2Result> listings = new AbstractSequentialIterator<ListObjectsV2Result>(s3.listObjectsV2(request))
{
@Override
protected ListObjectsV2Result computeNext(ListObjectsV2Result previous)
{
if (!previous.isTruncated()) {
return null;
}
request.setContinuationToken(previous.getNextContinuationToken());
return s3.listObjectsV2(request);
}
};
return Iterators.concat(Iterators.transform(listings, this::statusFromListing));
}
@Override
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Request)
{
final String continuationToken = "continue";
ListObjectsV2Result listingV2 = new ListObjectsV2Result();
if (continuationToken.equals(listObjectsV2Request.getContinuationToken())) {
S3ObjectSummary standardTwo = new S3ObjectSummary();
standardTwo.setStorageClass(StorageClass.Standard.toString());
standardTwo.setKey("test/standardTwo");
standardTwo.setLastModified(new Date());
listingV2.getObjectSummaries().add(standardTwo);
if (hasGlacierObjects) {
S3ObjectSummary glacier = new S3ObjectSummary();
glacier.setStorageClass(StorageClass.Glacier.toString());
glacier.setKey("test/glacier");
glacier.setLastModified(new Date());
listingV2.getObjectSummaries().add(glacier);
}
}
else {
S3ObjectSummary standardOne = new S3ObjectSummary();
standardOne.setStorageClass(StorageClass.Standard.toString());
standardOne.setKey("test/standardOne");
standardOne.setLastModified(new Date());
listingV2.getObjectSummaries().add(standardOne);
listingV2.setTruncated(true);
listingV2.setNextContinuationToken(continuationToken);
}
return listingV2;
}
/**
* List keys.
*
* @param s3client the s 3 client
* @param s3Bucket the s 3 bucket
* @param folder the folder
* @return the string[]
*/
private String[] listKeys(AmazonS3 s3client,String s3Bucket,String folder){
try{
return s3client.listObjectsV2(new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(folder)).getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
}catch(Exception e){
log.error("Error in listKeys",e);
ErrorManageUtil.uploadError("all", "all", "all", e.getMessage());
}
return new String[0];
}
/**
* List keys.
*
* @param s3client the s 3 client
* @param s3Bucket the s 3 bucket
* @param folder the folder
* @return the string[]
*/
private String[] listKeys(AmazonS3 s3client,String s3Bucket,String folder){
try{
return s3client.listObjectsV2(new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(folder)).getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
}catch(Exception e){
log.error("Error in listKeys",e);
ErrorManageUtil.uploadError("all", "all", "all", e.getMessage());
}
return new String[0];
}
@Override
public boolean matches(ListObjectsV2Request argument) {
if (argument instanceof ListObjectsV2Request) {
ListObjectsV2Request actual = (ListObjectsV2Request) argument;
return expected.getBucketName().equals(actual.getBucketName())
&& expected.getPrefix().equals(actual.getPrefix())
&& (expected.getContinuationToken() == null
? actual.getContinuationToken() == null
: expected.getContinuationToken().equals(actual.getContinuationToken()));
}
return false;
}
/**
* Execute.
*
* @param dataSource the data source
* @param type the type
* @return the list
*/
public List<Map<String, String>> uploadAssociationInfo(String dataSource,String type) {
LOGGER.info("Started EntityAssociationDataCollector for {}",type);
List<Map<String,String>> errorList = new ArrayList<>();
AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withCredentials(
new AWSStaticCredentialsProvider(new CredentialProvider().getCredentials(s3Account,s3Role))).withRegion(s3Region).build();
ObjectMapper objectMapper = new ObjectMapper();
try {
String indexName = dataSource + "_" + type;
String filePrefix = dataSource+"-"+type+"-";
List<String> childTypes = new ArrayList<>();
for (S3ObjectSummary objectSummary : s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName).withPrefix(dataPath+"/"+filePrefix)).getObjectSummaries()) {
String fileName = objectSummary.getKey().replace(dataPath+"/", "").replace(".data", "");
if(fileName.chars().filter(ch -> ch == '-').count() == 2) {
childTypes.add(fileName.replace(filePrefix,""));
}
}
String key = ConfigManager.getKeyForType(dataSource, type);
if (!childTypes.isEmpty()) {
for (String childType : childTypes) {
String childTypeES = type+"_"+childType;
if (!childType.equalsIgnoreCase("tags")) {
ESManager.createType(indexName, childTypeES, type);
LOGGER.info("Fetching data for {}", childTypeES);
List<Map<String, Object>> entities = new ArrayList<>();
S3Object entitiesData = s3Client.getObject(new GetObjectRequest(bucketName, dataPath+"/"+filePrefix+childType+".data"));
try (BufferedReader reader = new BufferedReader(new InputStreamReader(entitiesData.getObjectContent()))) {
entities = objectMapper.readValue(reader.lines().collect(Collectors.joining("\n")),new TypeReference<List<Map<String, Object>>>() {});
}
String loaddate = new SimpleDateFormat("yyyy-MM-dd HH:mm:00Z").format(new java.util.Date());
entities.parallelStream().forEach(obj -> obj.put("_loaddate", loaddate));
LOGGER.info("Collected : {}", entities.size());
if (!entities.isEmpty()) {
ErrorManager.getInstance(dataSource).handleError(indexName, childTypeES, loaddate, errorList,false);
ESManager.uploadData(indexName, childTypeES, entities, key.split(","));
ESManager.deleteOldDocuments(indexName, childTypeES, "_loaddate.keyword",
loaddate);
}
}
}
}
} catch (Exception e) {
LOGGER.error("Error in populating child tables", e);
Map<String,String> errorMap = new HashMap<>();
errorMap.put(ERROR, "Error in populating child tables");
errorMap.put(ERROR_TYPE, WARN);
errorMap.put(EXCEPTION, e.getMessage());
errorList.add(errorMap);
}
LOGGER.info("Completed EntityAssociationDataCollector for {}",type);
return errorList;
}
List<String> getKeys(String prefix, int limit, String after) {
if (limit < 0) {
throw new IllegalArgumentException("limit must not be negative: " + limit);
}
if (limit == 0) {
return List.of();
}
if (after == null) {
throw new IllegalArgumentException("after must not be null");
}
LOG.log(INFO, "Get keys from {0} bucket (limit={1}, after={2})...", bucketName, limit, after);
var keys = new ArrayList<String>();
var bytes = 0L;
var request = new ListObjectsV2Request().withBucketName(bucketName);
if (!after.isBlank()) {
LOG.log(INFO, "Set start after to: {0}", after);
request.setStartAfter(after);
}
while (true) {
var pendingKeys = limit - keys.size();
if (pendingKeys <= 0) {
LOG.log(DEBUG, "No more keys are pending: done.");
break;
}
request.setMaxKeys(Math.min(pendingKeys, MAX_KEYS_PER_PAGE));
request.setDelimiter("/");
request.setPrefix(prefix);
LOG.log(INFO, "Get objects list... (max={0})", request.getMaxKeys());
var objects = s3.listObjectsV2(request);
var summaries = objects.getObjectSummaries();
for (var summary : summaries) {
LOG.log(DEBUG, " o {0} (bytes: {1})", summary.getKey(), summary.getSize());
keys.add(summary.getKey());
bytes += summary.getSize();
}
LOG.log(INFO, " - {0} objects retrieved", keys.size());
if (!objects.isTruncated()) {
LOG.log(DEBUG, "Objects result is not truncated: done.");
break;
}
request.setContinuationToken(objects.getNextContinuationToken());
}
LOG.log(INFO, "Got {0} keys (bytes: {1})", keys.size(), bytes);
return keys;
}
private ExpandedGlob expandGlob(S3ResourceId glob) {
// The S3 API can list objects, filtered by prefix, but not by wildcard.
// Here, we find the longest prefix without wildcard "*",
// then filter the results with a regex.
checkArgument(glob.isWildcard(), "isWildcard");
String keyPrefix = glob.getKeyNonWildcardPrefix();
Pattern wildcardRegexp = Pattern.compile(wildcardToRegexp(glob.getKey()));
LOG.debug(
"expanding bucket {}, prefix {}, against pattern {}",
glob.getBucket(),
keyPrefix,
wildcardRegexp.toString());
ImmutableList.Builder<S3ResourceId> expandedPaths = ImmutableList.builder();
String continuationToken = null;
do {
ListObjectsV2Request request =
new ListObjectsV2Request()
.withBucketName(glob.getBucket())
.withPrefix(keyPrefix)
.withContinuationToken(continuationToken);
ListObjectsV2Result result;
try {
result = amazonS3.get().listObjectsV2(request);
} catch (AmazonClientException e) {
return ExpandedGlob.create(glob, new IOException(e));
}
continuationToken = result.getNextContinuationToken();
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
// Filter against regex.
if (wildcardRegexp.matcher(objectSummary.getKey()).matches()) {
S3ResourceId expandedPath =
S3ResourceId.fromComponents(objectSummary.getBucketName(), objectSummary.getKey())
.withSize(objectSummary.getSize())
.withLastModified(objectSummary.getLastModified());
LOG.debug("Expanded S3 object path {}", expandedPath);
expandedPaths.add(expandedPath);
}
}
} while (continuationToken != null);
return ExpandedGlob.create(glob, expandedPaths.build());
}
ListObjectsV2RequestArgumentMatches(ListObjectsV2Request expected) {
this.expected = checkNotNull(expected);
}
@Test
public void matchGlob() throws IOException {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar*baz");
ListObjectsV2Request firstRequest =
new ListObjectsV2Request()
.withBucketName(path.getBucket())
.withPrefix(path.getKeyNonWildcardPrefix())
.withContinuationToken(null);
// Expected to be returned; prefix and wildcard/regex match
S3ObjectSummary firstMatch = new S3ObjectSummary();
firstMatch.setBucketName(path.getBucket());
firstMatch.setKey("foo/bar0baz");
firstMatch.setSize(100);
firstMatch.setLastModified(new Date(1540000000001L));
// Expected to not be returned; prefix matches, but substring after wildcard does not
S3ObjectSummary secondMatch = new S3ObjectSummary();
secondMatch.setBucketName(path.getBucket());
secondMatch.setKey("foo/bar1qux");
secondMatch.setSize(200);
secondMatch.setLastModified(new Date(1540000000002L));
// Expected first request returns continuation token
ListObjectsV2Result firstResult = new ListObjectsV2Result();
firstResult.setNextContinuationToken("token");
firstResult.getObjectSummaries().add(firstMatch);
firstResult.getObjectSummaries().add(secondMatch);
when(s3FileSystem
.getAmazonS3Client()
.listObjectsV2(argThat(new ListObjectsV2RequestArgumentMatches(firstRequest))))
.thenReturn(firstResult);
// Expect second request with continuation token
ListObjectsV2Request secondRequest =
new ListObjectsV2Request()
.withBucketName(path.getBucket())
.withPrefix(path.getKeyNonWildcardPrefix())
.withContinuationToken("token");
// Expected to be returned; prefix and wildcard/regex match
S3ObjectSummary thirdMatch = new S3ObjectSummary();
thirdMatch.setBucketName(path.getBucket());
thirdMatch.setKey("foo/bar2baz");
thirdMatch.setSize(300);
thirdMatch.setLastModified(new Date(1540000000003L));
// Expected second request returns third prefix match and no continuation token
ListObjectsV2Result secondResult = new ListObjectsV2Result();
secondResult.setNextContinuationToken(null);
secondResult.getObjectSummaries().add(thirdMatch);
when(s3FileSystem
.getAmazonS3Client()
.listObjectsV2(argThat(new ListObjectsV2RequestArgumentMatches(secondRequest))))
.thenReturn(secondResult);
// Expect object metadata queries for content encoding
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentEncoding("");
when(s3FileSystem.getAmazonS3Client().getObjectMetadata(anyObject())).thenReturn(metadata);
assertThat(
s3FileSystem.matchGlobPaths(ImmutableList.of(path)).get(0),
MatchResultMatcher.create(
ImmutableList.of(
MatchResult.Metadata.builder()
.setIsReadSeekEfficient(true)
.setResourceId(
S3ResourceId.fromComponents(
firstMatch.getBucketName(), firstMatch.getKey()))
.setSizeBytes(firstMatch.getSize())
.setLastModifiedMillis(firstMatch.getLastModified().getTime())
.build(),
MatchResult.Metadata.builder()
.setIsReadSeekEfficient(true)
.setResourceId(
S3ResourceId.fromComponents(
thirdMatch.getBucketName(), thirdMatch.getKey()))
.setSizeBytes(thirdMatch.getSize())
.setLastModifiedMillis(thirdMatch.getLastModified().getTime())
.build())));
}
@Test
public void matchGlobWithSlashes() throws IOException {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar\\baz*");
ListObjectsV2Request request =
new ListObjectsV2Request()
.withBucketName(path.getBucket())
.withPrefix(path.getKeyNonWildcardPrefix())
.withContinuationToken(null);
// Expected to be returned; prefix and wildcard/regex match
S3ObjectSummary firstMatch = new S3ObjectSummary();
firstMatch.setBucketName(path.getBucket());
firstMatch.setKey("foo/bar\\baz0");
firstMatch.setSize(100);
firstMatch.setLastModified(new Date(1540000000001L));
// Expected to not be returned; prefix matches, but substring after wildcard does not
S3ObjectSummary secondMatch = new S3ObjectSummary();
secondMatch.setBucketName(path.getBucket());
secondMatch.setKey("foo/bar/baz1");
secondMatch.setSize(200);
secondMatch.setLastModified(new Date(1540000000002L));
// Expected first request returns continuation token
ListObjectsV2Result result = new ListObjectsV2Result();
result.getObjectSummaries().add(firstMatch);
result.getObjectSummaries().add(secondMatch);
when(s3FileSystem
.getAmazonS3Client()
.listObjectsV2(argThat(new ListObjectsV2RequestArgumentMatches(request))))
.thenReturn(result);
// Expect object metadata queries for content encoding
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentEncoding("");
when(s3FileSystem.getAmazonS3Client().getObjectMetadata(anyObject())).thenReturn(metadata);
assertThat(
s3FileSystem.matchGlobPaths(ImmutableList.of(path)).get(0),
MatchResultMatcher.create(
ImmutableList.of(
MatchResult.Metadata.builder()
.setIsReadSeekEfficient(true)
.setResourceId(
S3ResourceId.fromComponents(
firstMatch.getBucketName(), firstMatch.getKey()))
.setSizeBytes(firstMatch.getSize())
.setLastModifiedMillis(firstMatch.getLastModified().getTime())
.build())));
}
@Test
public void matchVariousInvokeThreadPool() throws IOException {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
AmazonS3Exception notFoundException = new AmazonS3Exception("mock exception");
notFoundException.setStatusCode(404);
S3ResourceId pathNotExist =
S3ResourceId.fromUri("s3://testbucket/testdirectory/nonexistentfile");
when(s3FileSystem
.getAmazonS3Client()
.getObjectMetadata(
argThat(
new GetObjectMetadataRequestMatcher(
new GetObjectMetadataRequest(
pathNotExist.getBucket(), pathNotExist.getKey())))))
.thenThrow(notFoundException);
AmazonS3Exception forbiddenException = new AmazonS3Exception("mock exception");
forbiddenException.setStatusCode(403);
S3ResourceId pathForbidden =
S3ResourceId.fromUri("s3://testbucket/testdirectory/forbiddenfile");
when(s3FileSystem
.getAmazonS3Client()
.getObjectMetadata(
argThat(
new GetObjectMetadataRequestMatcher(
new GetObjectMetadataRequest(
pathForbidden.getBucket(), pathForbidden.getKey())))))
.thenThrow(forbiddenException);
S3ResourceId pathExist = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
s3ObjectMetadata.setContentLength(100);
s3ObjectMetadata.setLastModified(new Date(1540000000000L));
s3ObjectMetadata.setContentEncoding("not-gzip");
when(s3FileSystem
.getAmazonS3Client()
.getObjectMetadata(
argThat(
new GetObjectMetadataRequestMatcher(
new GetObjectMetadataRequest(pathExist.getBucket(), pathExist.getKey())))))
.thenReturn(s3ObjectMetadata);
S3ResourceId pathGlob = S3ResourceId.fromUri("s3://testbucket/path/part*");
S3ObjectSummary foundListObject = new S3ObjectSummary();
foundListObject.setBucketName(pathGlob.getBucket());
foundListObject.setKey("path/part-0");
foundListObject.setSize(200);
foundListObject.setLastModified(new Date(1541000000000L));
ListObjectsV2Result listObjectsResult = new ListObjectsV2Result();
listObjectsResult.setNextContinuationToken(null);
listObjectsResult.getObjectSummaries().add(foundListObject);
when(s3FileSystem.getAmazonS3Client().listObjectsV2(notNull(ListObjectsV2Request.class)))
.thenReturn(listObjectsResult);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentEncoding("");
when(s3FileSystem
.getAmazonS3Client()
.getObjectMetadata(
argThat(
new GetObjectMetadataRequestMatcher(
new GetObjectMetadataRequest(pathGlob.getBucket(), "path/part-0")))))
.thenReturn(metadata);
assertThat(
s3FileSystem.match(
ImmutableList.of(
pathNotExist.toString(),
pathForbidden.toString(),
pathExist.toString(),
pathGlob.toString())),
contains(
MatchResultMatcher.create(MatchResult.Status.NOT_FOUND, new FileNotFoundException()),
MatchResultMatcher.create(
MatchResult.Status.ERROR, new IOException(forbiddenException)),
MatchResultMatcher.create(100, 1540000000000L, pathExist, true),
MatchResultMatcher.create(
200,
1541000000000L,
S3ResourceId.fromComponents(pathGlob.getBucket(), foundListObject.getKey()),
true)));
}
private void assertS3Contents(List<Map<String, Object>> expected)
throws IOException
{
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(s3ParentKey);
ListObjectsV2Result result;
List<String> lines = new ArrayList<>();
do {
result = s3Client.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
if (objectSummary.getKey().endsWith("_part_00")) {
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(
s3Client.getObject(
objectSummary.getBucketName(),
objectSummary.getKey()).getObjectContent()))) {
lines.addAll(reader.lines().collect(Collectors.toList()));
}
}
else {
assertThat(objectSummary.getKey(), endsWith("_manifest"));
}
try {
s3Client.deleteObject(objectSummary.getBucketName(), objectSummary.getKey());
}
catch (Exception e) {
logger.warn("Failed to delete S3 object: bucket={}, key={}", s3Bucket, objectSummary.getKey(), e);
}
}
req.setContinuationToken(result.getNextContinuationToken());
} while (result.isTruncated());
List<ImmutableMap<String, ? extends Serializable>> actual =
lines.stream()
.map(
l -> {
String[] values = l.split("\\|");
assertThat(values.length, is(3));
return ImmutableMap.of(
"id", Integer.valueOf(values[0]),
"name", values[1],
"score", Float.valueOf(values[2]));
}
)
.sorted((o1, o2) -> ((Integer)o1.get("id")) - ((Integer) o2.get("id")))
.collect(Collectors.toList());
assertThat(actual, is(expected));
}
/** Unsupported Operation. */
@Override public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Req) throws SdkClientException {
throw new UnsupportedOperationException("Operation not supported");
}
@Test
public void testBlobListV2() throws Exception {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(BYTE_SOURCE.size());
for (int i = 1; i < 5; ++i) {
client.putObject(containerName, String.valueOf(i),
BYTE_SOURCE.openStream(), metadata);
}
ListObjectsV2Result result = client.listObjectsV2(
new ListObjectsV2Request()
.withBucketName(containerName)
.withMaxKeys(1)
.withStartAfter("1"));
assertThat(result.getContinuationToken()).isEmpty();
assertThat(result.getStartAfter()).isEqualTo("1");
assertThat(result.getNextContinuationToken()).isEqualTo("2");
assertThat(result.isTruncated()).isTrue();
assertThat(result.getObjectSummaries()).hasSize(1);
assertThat(result.getObjectSummaries().get(0).getKey()).isEqualTo("2");
result = client.listObjectsV2(
new ListObjectsV2Request()
.withBucketName(containerName)
.withMaxKeys(1)
.withContinuationToken(result.getNextContinuationToken()));
assertThat(result.getContinuationToken()).isEqualTo("2");
assertThat(result.getStartAfter()).isEmpty();
assertThat(result.getNextContinuationToken()).isEqualTo("3");
assertThat(result.isTruncated()).isTrue();
assertThat(result.getObjectSummaries()).hasSize(1);
assertThat(result.getObjectSummaries().get(0).getKey()).isEqualTo("3");
result = client.listObjectsV2(
new ListObjectsV2Request()
.withBucketName(containerName)
.withMaxKeys(1)
.withContinuationToken(result.getNextContinuationToken()));
assertThat(result.getContinuationToken()).isEqualTo("3");
assertThat(result.getStartAfter()).isEmpty();
assertThat(result.getNextContinuationToken()).isNull();
assertThat(result.isTruncated()).isFalse();
assertThat(result.getObjectSummaries()).hasSize(1);
assertThat(result.getObjectSummaries().get(0).getKey()).isEqualTo("4");
}
@Override
public void setBucketName(String bucketName) {
listObjectsRequest = new ListObjectsV2Request().withBucketName(bucketName);
}
@Test
public void testListVersion2() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
runner.setProperty(ListS3.LIST_TYPE, "2");
Date lastModified = new Date();
ListObjectsV2Result objectListing = new ListObjectsV2Result();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("a");
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
objectSummary2.setBucketName("test-bucket");
objectSummary2.setKey("b/c");
objectSummary2.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary2);
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
objectSummary3.setBucketName("test-bucket");
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsV2Request> captureRequest = ArgumentCaptor.forClass(ListObjectsV2Request.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture());
ListObjectsV2Request request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "a");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
}
@Test
public void testListVersion2WithRequesterPays() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
runner.setProperty(ListS3.LIST_TYPE, "2");
Date lastModified = new Date();
ListObjectsV2Result objectListing = new ListObjectsV2Result();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("a");
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
objectSummary2.setBucketName("test-bucket");
objectSummary2.setKey("b/c");
objectSummary2.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary2);
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
objectSummary3.setBucketName("test-bucket");
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsV2Request> captureRequest = ArgumentCaptor.forClass(ListObjectsV2Request.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture());
ListObjectsV2Request request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertTrue(request.isRequesterPays());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "a");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
}