下面列出了怎么用com.amazonaws.services.dynamodbv2.model.ScanRequest的API类实例代码及写法,或者点击链接到github查看源代码。
public RetryResult<ScanResult> scanTable(
String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer
totalSegments, Map<String, AttributeValue> exclusiveStartKey, long limit, Reporter reporter) {
final ScanRequest scanRequest = new ScanRequest(tableName)
.withExclusiveStartKey(exclusiveStartKey)
.withLimit(Ints.checkedCast(limit))
.withSegment(segment)
.withTotalSegments(totalSegments)
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
if (dynamoDBQueryFilter != null) {
Map<String, Condition> scanFilter = dynamoDBQueryFilter.getScanFilter();
if (!scanFilter.isEmpty()) {
scanRequest.setScanFilter(scanFilter);
}
}
RetryResult<ScanResult> retryResult = getRetryDriver().runWithRetry(new Callable<ScanResult>() {
@Override
public ScanResult call() {
log.debug("Executing DynamoDB scan: " + scanRequest);
return dynamoDB.scan(scanRequest);
}
}, reporter, PrintCounter.DynamoDBReadThrottle);
return retryResult;
}
private static void findProductsForPriceLessThanZero() {
Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>();
expressionAttributeValues.put(":pr", new AttributeValue().withN("100"));
ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withFilterExpression("Price < :pr")
.withExpressionAttributeValues(expressionAttributeValues)
.withProjectionExpression("Id, Title, ProductCategory, Price");
ScanResult result = client.scan(scanRequest);
System.out.println("Scan of " + tableName + " for items with a price less than 100.");
for (Map<String, AttributeValue> item : result.getItems()) {
System.out.println("");
printItem(item);
}
}
@Override
public PCollection<T> expand(PBegin input) {
checkArgument((getScanRequestFn() != null), "withScanRequestFn() is required");
checkArgument((getAwsClientsProvider() != null), "withAwsClientsProvider() is required");
ScanRequest scanRequest = getScanRequestFn().apply(null);
checkArgument(
(scanRequest.getTotalSegments() != null && scanRequest.getTotalSegments() > 0),
"TotalSegments is required with withScanRequestFn() and greater zero");
PCollection<Read<T>> splits =
(PCollection<Read<T>>)
input.apply("Create", Create.of(this)).apply("Split", ParDo.of(new SplitFn()));
splits.setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}));
PCollection<T> output =
(PCollection<T>)
splits
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Read", ParDo.of(new ReadFn()));
output.setCoder(getCoder());
return output;
}
@Test
public void testMissingTotalSegments() {
thrown.expectMessage("TotalSegments is required with withScanRequestFn()");
pipeline.apply(
DynamoDBIO.read()
.withScanRequestFn(
(SerializableFunction<Void, ScanRequest>) input -> new ScanRequest(tableName))
.withAwsClientsProvider(
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
try {
pipeline.run().waitUntilFinish();
fail("TotalSegments is required with withScanRequestFn()");
} catch (IllegalArgumentException ex) {
assertEquals("TotalSegments is required with withScanRequestFn()", ex.getMessage());
}
}
@Test
public void testNegativeTotalSegments() {
thrown.expectMessage("TotalSegments is required with withScanRequestFn() and greater zero");
pipeline.apply(
DynamoDBIO.read()
.withScanRequestFn(
(SerializableFunction<Void, ScanRequest>)
input -> new ScanRequest(tableName).withTotalSegments(-1))
.withAwsClientsProvider(
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
try {
pipeline.run().waitUntilFinish();
fail("withTotalSegments() is expected and greater than zero");
} catch (IllegalArgumentException ex) {
assertEquals(
"TotalSegments is required with withScanRequestFn() and greater zero", ex.getMessage());
}
}
/**
* Returns the list of usernames stored in the identity table.
*
* @return list of existing usernames in DynamoDB table
*/
public List<String> listUsers() {
List<String> users = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(USER_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
users.add(s);
}
return users;
}
/**
* @return the list of device ID (UID) stored in the identity table.
*/
public List<String> listDevices() {
List<String> devices = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
devices.add(s);
}
return devices;
}
public ScanResult scan(final ScanRequest request, final int permitsToConsume) throws BackendException {
setUserAgent(request);
ScanResult result;
timedReadThrottle(SCAN, request.getTableName(), permitsToConsume);
final Timer.Context apiTimerContext = getTimerContext(SCAN, request.getTableName());
try {
result = client.scan(request);
} catch (Exception e) {
throw processDynamoDbApiException(e, SCAN, request.getTableName());
} finally {
apiTimerContext.stop();
}
meterConsumedCapacity(SCAN, result.getConsumedCapacity());
measureItemCount(SCAN, request.getTableName(), result.getCount());
return result;
}
@Override
public KeyIterator getKeys(final SliceQuery query, final StoreTransaction txh) throws BackendException {
log.debug("Entering getKeys table:{} query:{} txh:{}", getTableName(), encodeForLog(query), txh);
final ScanRequest scanRequest = super.createScanRequest();
final Scanner scanner;
if (client.isEnableParallelScan()) {
scanner = client.getDelegate().getParallelScanCompletionService(scanRequest);
} else {
scanner = new SequentialScanner(client.getDelegate(), scanRequest);
}
// Because SINGLE records cannot be split across scan results, we can use the same interpreter for both
// sequential and parallel scans.
final KeyIterator result = new ScanBackedKeyIterator(scanner, new SingleRowScanInterpreter(query));
log.debug("Exiting getKeys table:{} query:{} txh:{} returning:{}", getTableName(), encodeForLog(query), txh, result);
return result;
}
@Override
public KeyIterator getKeys(final SliceQuery query, final StoreTransaction txh) throws BackendException {
log.debug("Entering getKeys table:{} query:{} txh:{}", getTableName(), encodeForLog(query), txh);
final Expression filterExpression = new FilterExpressionBuilder().rangeKey()
.range(query)
.build();
final ScanRequest scanRequest = super.createScanRequest()
.withFilterExpression(filterExpression.getConditionExpression())
.withExpressionAttributeValues(filterExpression.getAttributeValues());
final Scanner scanner;
final ScanContextInterpreter interpreter;
if (client.isEnableParallelScan()) {
scanner = client.getDelegate().getParallelScanCompletionService(scanRequest);
interpreter = new MultiRowParallelScanInterpreter(this, query);
} else {
scanner = new SequentialScanner(client.getDelegate(), scanRequest);
interpreter = new MultiRowSequentialScanInterpreter(this, query);
}
final KeyIterator result = new ScanBackedKeyIterator(scanner, interpreter);
log.debug("Exiting getKeys table:{} query:{} txh:{} returning:{}", getTableName(), encodeForLog(query), txh, result);
return result;
}
/**
* This method gets a segmentedScanResult and submits the next scan request for that segment, if there is one.
* @return the next available ScanResult
* @throws ExecutionException if one of the segment pages threw while executing
* @throws InterruptedException if one of the segment pages was interrupted while executing.
*/
private ScanContext grab() throws ExecutionException, InterruptedException {
final Future<ScanContext> ret = exec.take();
final ScanRequest originalRequest = ret.get().getScanRequest();
final int segment = originalRequest.getSegment();
final ScanSegmentWorker sw = workers[segment];
if (sw.hasNext()) {
currentFutures[segment] = exec.submit(sw);
} else {
finishSegment(segment);
currentFutures[segment] = null;
}
//FYI, This might block if nothing is available.
return ret.get();
}
/**
* Begins to pipe the log results by parallel scanning the table and the
* consumer writing the results.
*/
public void pipe(final AbstractLogConsumer consumer)
throws ExecutionException, InterruptedException {
final DynamoDBTableScan scanner = new DynamoDBTableScan(rateLimit,
client);
final ScanRequest request = new ScanRequest().withTableName(tableName)
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.withLimit(BootstrapConstants.SCAN_LIMIT)
.withConsistentRead(consistentScan);
final ParallelScanExecutor scanService = scanner
.getParallelScanCompletionService(request, numSegments,
threadPool, section, totalSections);
while (!scanService.finished()) {
SegmentedScanResult result = scanService.grab();
consumer.writeResult(result);
}
shutdown(true);
consumer.shutdown(true);
}
/**
* This function copies a scan request for the number of segments and then
* adds those workers to the executor service to begin scanning.
*
* @param totalSections
* @param section
*
* @return <ParallelScanExecutor> the parallel scan executor to grab results
* when a segment is finished.
*/
public ParallelScanExecutor getParallelScanCompletionService(
ScanRequest initialRequest, int numSegments, Executor executor,
int section, int totalSections) {
final int segments = Math.max(1, numSegments);
final ParallelScanExecutor completion = new ParallelScanExecutor(
executor, segments);
int sectionSize = segments / totalSections;
int start = sectionSize * section;
int end = start + sectionSize;
if (section + 1 == totalSections) {
end = segments;
}
for (int segment = start; segment < end; segment++) {
ScanRequest scanSegment = copyScanRequest(initialRequest)
.withTotalSegments(segments).withSegment(segment);
completion.addWorker(new ScanSegmentWorker(this.client,
this.rateLimiter, scanSegment), segment);
}
return completion;
}
private void dumpTables() {
for (String table : client.listTables().getTableNames()) {
ScanResult scanResult;
Map<String, AttributeValue> lastKey = null;
do {
scanResult = client.scan(new ScanRequest().withTableName(table).withExclusiveStartKey(lastKey));
lastKey = scanResult.getLastEvaluatedKey();
for (Map<String, AttributeValue> map : scanResult.getItems()) {
for (Map.Entry<String, AttributeValue> item : map.entrySet()) {
System.out.print("item.put(\"");
System.out.print(item.getKey());
System.out.print("\", b642Av(\"");
System.out.print(Base64.encodeAsString(AttributeValueMarshaller.marshall(item.getValue()).array()));
System.out.println("\"));");
}
System.out.print("ddb.putItem(new PutItemRequest(\"");
System.out.print(table);
System.out.println("\", item));");
System.out.println("item.clear();");
System.out.println();
}
} while (lastKey != null);
}
}
private ScanResult getResults(
final String tableName,
final short adapterId,
final List<GeoWaveRow> resultList,
final Map<String, AttributeValue> lastEvaluatedKey) {
final ScanRequest request = new ScanRequest(tableName);
if ((lastEvaluatedKey != null) && !lastEvaluatedKey.isEmpty()) {
request.setExclusiveStartKey(lastEvaluatedKey);
}
final ScanResult result = client.scan(request);
result.getItems().forEach(objMap -> {
final byte[] dataId = objMap.get(DynamoDBRow.GW_PARTITION_ID_KEY).getB().array();
final AttributeValue valueAttr = objMap.get(DynamoDBRow.GW_VALUE_KEY);
final byte[] value = valueAttr == null ? null : valueAttr.getB().array();
final AttributeValue visAttr = objMap.get(DynamoDBRow.GW_VISIBILITY_KEY);
final byte[] vis = visAttr == null ? new byte[0] : visAttr.getB().array();
resultList.add(DataIndexUtils.deserializeDataIndexRow(dataId, adapterId, value, vis));
});
return result;
}
/**
* @return the list of device ID (UID) stored in the identity table.
*/
public List<String> listDevices() {
List<String> devices = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
devices.add(s);
}
return devices;
}
/**
* Returns the list of usernames stored in the identity table.
*
* @return list of existing usernames in DynamoDB table
*/
public List<String> listUsers() {
List<String> users = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(USER_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
users.add(s);
}
return users;
}
/**
* @return the list of device ID (UID) stored in the identity table.
*/
public List<String> listDevices() {
List<String> devices = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
devices.add(s);
}
return devices;
}
private static void findProductsForPriceLessThanZero() {
Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>();
expressionAttributeValues.put(":pr", new AttributeValue().withN("100"));
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withFilterExpression("Price < :pr")
.withExpressionAttributeValues(expressionAttributeValues)
.withProjectionExpression("Id, Title, ProductCategory, Price");
ScanResult result = client.scan(scanRequest);
System.out.println("Scan of " + tableName + " for items with a price less than 100.");
for (Map<String, AttributeValue> item : result.getItems()) {
System.out.println("");
printItem(item);
}
}
/**
* Derives an Arrow {@link Schema} for the given table by performing a small table scan and mapping the returned
* attribute values' types to Arrow types. If the table is empty, only attributes found in the table's metadata
* are added to the return schema.
*
* @param tableName the table to derive a schema for
* @param invoker the ThrottlingInvoker to call DDB with
* @param ddbClient the DDB client to use
* @return the table's derived schema
*/
public static Schema peekTableForSchema(String tableName, ThrottlingInvoker invoker, AmazonDynamoDB ddbClient)
throws TimeoutException
{
ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withLimit(SCHEMA_INFERENCE_NUM_RECORDS);
ScanResult scanResult = invoker.invoke(() -> ddbClient.scan(scanRequest));
List<Map<String, AttributeValue>> items = scanResult.getItems();
Set<String> discoveredColumns = new HashSet<>();
SchemaBuilder schemaBuilder = new SchemaBuilder();
if (!items.isEmpty()) {
for (Map<String, AttributeValue> item : items) {
for (Map.Entry<String, AttributeValue> column : item.entrySet()) {
if (!discoveredColumns.contains(column.getKey())) {
Field field = DDBTypeUtils.inferArrowField(column.getKey(), ItemUtils.toSimpleValue(column.getValue()));
if (field != null) {
schemaBuilder.addField(field);
discoveredColumns.add(column.getKey());
}
}
}
}
}
else {
// there's no items, so use any attributes defined in the table metadata
DynamoDBTable table = getTable(tableName, invoker, ddbClient);
for (AttributeDefinition attributeDefinition : table.getKnownAttributeDefinitions()) {
schemaBuilder.addField(DDBTypeUtils.getArrowFieldFromDDBType(attributeDefinition.getAttributeName(), attributeDefinition.getAttributeType()));
}
}
return schemaBuilder.build();
}
@Override
public final Response scan(final Request request) {
final String projectionExpression = request.getPartitionKey() + ", " + request.getSortKey();
ScanRequest scanRequest = new ScanRequest()
.withTableName(request.getTableName())
.withProjectionExpression(projectionExpression);
StringBuilder filterExpressionBuilder;
Map<String, AttributeValue> expressionAttributeValues;
if (request.getFilterData() != null) {
filterExpressionBuilder = new StringBuilder();
expressionAttributeValues = new HashMap<>();
processFilterData(request, filterExpressionBuilder, expressionAttributeValues);
// Add to ScanRequest.
scanRequest.withFilterExpression(filterExpressionBuilder.toString());
scanRequest.withExpressionAttributeValues(expressionAttributeValues);
}
final ScanResult scanResult = dynamoDBClient.scan(scanRequest);
final StringBuilder response = new StringBuilder();
response.append("PK of items read with scan (V2): ");
for (Map<String, AttributeValue> item : scanResult.getItems()) {
response.append(prepareKeyStr(item, request));
}
return new Response(response.toString(), null);
}
private static int getCharactersCount() {
ScanRequest scanRequest = new ScanRequest()
.withTableName("starwars");
ScanResult result = getClient().scan(scanRequest);
System.out.println("Total number of characters found: " + result.getCount());
// scanning only returns 1MB of data
// need to aggressively scan and return an accurate count
return result.getCount();
}
private Stream<Entry> scan(SecretEventStream.Filter<Entry> filter, Converters converters) {
ScanRequest scanRequest = new ScanRequest();
scanRequest.withConsistentRead(true);
scanRequest.withTableName(tableName);
FilterGenerator filterGenerator = new FilterGenerator();
FilterGenerator.Filter generated = filterGenerator.process(filter.parsedAttributeCondition.get(), converters);
if(!generated.expressionAttributeNames.isEmpty()) {
scanRequest.withExpressionAttributeNames(generated.expressionAttributeNames);
}
if (!generated.expressionAttributeValues.isEmpty()) {
scanRequest.withExpressionAttributeValues(generated.expressionAttributeValues);
}
scanRequest.withFilterExpression(generated.filterExpression);
ScanResult result = client.scan(scanRequest);
List<Map<String, AttributeValue>> results = new ArrayList<>();
results.addAll(result.getItems());
while (result.getLastEvaluatedKey() != null) {
scanRequest = scanRequest.withExclusiveStartKey(result.getLastEvaluatedKey());
result = client.scan(scanRequest);
results.addAll(result.getItems());
}
Stream<Entry> typedResult = results.stream().map(this::fromMap);
if (filter.reverse) {
typedResult = Lists.reverse(typedResult.collect(Collectors.toCollection(LinkedList::new))).stream();
}
return typedResult;
}
@Test
public void testKeySet() throws Exception {
ScanRequest request = new ScanRequest().withConsistentRead(true).withTableName(tableName);
ScanResult result = constructScanResult();
when(mockDynamoDBClient.scan(request)).thenReturn(result);
// Call the KeySet method and assert the expected secret identifiers are returned.
Set<SecretIdentifier> keys = dynamoDB.keySet();
assertEquals(keys.size(), 2);
assertTrue(keys.contains(new SecretIdentifier(SECRET_NAME)));
assertTrue(keys.contains(new SecretIdentifier(SECRET2_NAME)));
verify(mockDynamoDBClient, times(1)).scan(request);
}
@Test
public void testKeySetEmpty() throws Exception {
ScanRequest request = new ScanRequest().withConsistentRead(true).withTableName(tableName);
ScanResult result = new ScanResult().withCount(0).withItems(new ArrayList<>());
when(mockDynamoDBClient.scan(request)).thenReturn(result);
// Call the KeySet method and check that is it empty.
Set<SecretIdentifier> keySet = dynamoDB.keySet();
assertTrue(keySet.isEmpty());
verify(mockDynamoDBClient, times(1)).scan(request);
}
@Override
public void run() {
System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments
+ " segments " + itemLimit + " items at a time...");
Map<String, AttributeValue> exclusiveStartKey = null;
int totalScannedItemCount = 0;
int totalScanRequestCount = 0;
try {
while (true) {
ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withLimit(itemLimit)
.withExclusiveStartKey(exclusiveStartKey).withTotalSegments(totalSegments).withSegment(segment);
ScanResult result = client.scan(scanRequest);
totalScanRequestCount++;
totalScannedItemCount += result.getScannedCount();
// print items returned from scan request
processScanResult(segment, result);
exclusiveStartKey = result.getLastEvaluatedKey();
if (exclusiveStartKey == null) {
break;
}
}
}
catch (AmazonServiceException ase) {
System.err.println(ase.getMessage());
}
finally {
System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of "
+ totalSegments + " of " + tableName + " with " + totalScanRequestCount + " scan requests");
}
}
@ProcessElement
public void processElement(@Element Read<T> spec, OutputReceiver<Read<T>> out) {
ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
for (int i = 0; i < scanRequest.getTotalSegments(); i++) {
out.output(spec.withSegmentId(i));
}
}
@ProcessElement
public void processElement(@Element Read<T> spec, OutputReceiver<T> out) {
AmazonDynamoDB client = spec.getAwsClientsProvider().createDynamoDB();
ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
scanRequest.setSegment(spec.getSegmentId());
ScanResult scanResult = client.scan(scanRequest);
out.output(spec.getScanResultMapperFn().apply(scanResult));
}
static List<Map<String, AttributeValue>> generateTestData(String tableName, int numOfItems) {
BatchWriteItemRequest batchWriteItemRequest =
generateBatchWriteItemRequest(tableName, numOfItems);
dynamoDBClient.batchWriteItem(batchWriteItemRequest);
ScanResult scanResult = dynamoDBClient.scan(new ScanRequest().withTableName(tableName));
List<Map<String, AttributeValue>> items = scanResult.getItems();
Assert.assertEquals(numOfItems, items.size());
return items;
}
@Test
public void testReadScanResult() {
PCollection<List<Map<String, AttributeValue>>> actual =
pipeline.apply(
DynamoDBIO.<List<Map<String, AttributeValue>>>read()
.withAwsClientsProvider(
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient()))
.withScanRequestFn(
(SerializableFunction<Void, ScanRequest>)
input -> new ScanRequest(tableName).withTotalSegments(1))
.items());
PAssert.that(actual).containsInAnyOrder(expected);
pipeline.run().waitUntilFinish();
}