下面列出了com.amazonaws.services.s3.model.ListObjectsV2Request#setContinuationToken ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Calls DescribeDBInstances on the AWS RDS Client returning all DB Instances that match the supplied predicate and attempting
* to push down certain predicates (namely queries for specific DB Instance) to EC2.
*
* @See TableProvider
*/
@Override
public void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker)
{
ValueSet bucketConstraint = recordsRequest.getConstraints().getSummary().get("bucket_name");
String bucket;
if (bucketConstraint != null && bucketConstraint.isSingleValue()) {
bucket = bucketConstraint.getSingleValue().toString();
}
else {
throw new IllegalArgumentException("Queries against the objects table must filter on a single bucket " +
"(e.g. where bucket_name='my_bucket'.");
}
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucket).withMaxKeys(MAX_KEYS);
ListObjectsV2Result result;
do {
result = amazonS3.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
toRow(objectSummary, spiller);
}
req.setContinuationToken(result.getNextContinuationToken());
}
while (result.isTruncated() && queryStatusChecker.isQueryRunning());
}
private Iterator<LocatedFileStatus> listPrefix(Path path)
{
String key = keyFromPath(path);
if (!key.isEmpty()) {
key += PATH_SEPARATOR;
}
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(getBucketName(uri))
.withPrefix(key)
.withDelimiter(PATH_SEPARATOR)
.withRequesterPays(requesterPaysEnabled);
STATS.newListObjectsCall();
Iterator<ListObjectsV2Result> listings = new AbstractSequentialIterator<ListObjectsV2Result>(s3.listObjectsV2(request))
{
@Override
protected ListObjectsV2Result computeNext(ListObjectsV2Result previous)
{
if (!previous.isTruncated()) {
return null;
}
request.setContinuationToken(previous.getNextContinuationToken());
return s3.listObjectsV2(request);
}
};
return Iterators.concat(Iterators.transform(listings, this::statusFromListing));
}
private void assertS3Contents(List<Map<String, Object>> expected)
throws IOException
{
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(s3ParentKey);
ListObjectsV2Result result;
List<String> lines = new ArrayList<>();
do {
result = s3Client.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
if (objectSummary.getKey().endsWith("_part_00")) {
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(
s3Client.getObject(
objectSummary.getBucketName(),
objectSummary.getKey()).getObjectContent()))) {
lines.addAll(reader.lines().collect(Collectors.toList()));
}
}
else {
assertThat(objectSummary.getKey(), endsWith("_manifest"));
}
try {
s3Client.deleteObject(objectSummary.getBucketName(), objectSummary.getKey());
}
catch (Exception e) {
logger.warn("Failed to delete S3 object: bucket={}, key={}", s3Bucket, objectSummary.getKey(), e);
}
}
req.setContinuationToken(result.getNextContinuationToken());
} while (result.isTruncated());
List<ImmutableMap<String, ? extends Serializable>> actual =
lines.stream()
.map(
l -> {
String[] values = l.split("\\|");
assertThat(values.length, is(3));
return ImmutableMap.of(
"id", Integer.valueOf(values[0]),
"name", values[1],
"score", Float.valueOf(values[2]));
}
)
.sorted((o1, o2) -> ((Integer)o1.get("id")) - ((Integer) o2.get("id")))
.collect(Collectors.toList());
assertThat(actual, is(expected));
}