下面列出了怎么用com.amazonaws.services.dynamodbv2.model.ScanResult的API类实例代码及写法,或者点击链接到github查看源代码。
public RetryResult<ScanResult> scanTable(
String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer
totalSegments, Map<String, AttributeValue> exclusiveStartKey, long limit, Reporter reporter) {
final ScanRequest scanRequest = new ScanRequest(tableName)
.withExclusiveStartKey(exclusiveStartKey)
.withLimit(Ints.checkedCast(limit))
.withSegment(segment)
.withTotalSegments(totalSegments)
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
if (dynamoDBQueryFilter != null) {
Map<String, Condition> scanFilter = dynamoDBQueryFilter.getScanFilter();
if (!scanFilter.isEmpty()) {
scanRequest.setScanFilter(scanFilter);
}
}
RetryResult<ScanResult> retryResult = getRetryDriver().runWithRetry(new Callable<ScanResult>() {
@Override
public ScanResult call() {
log.debug("Executing DynamoDB scan: " + scanRequest);
return dynamoDB.scan(scanRequest);
}
}, reporter, PrintCounter.DynamoDBReadThrottle);
return retryResult;
}
@Override
protected PageResults<Map<String, AttributeValue>> fetchPage(RequestLimit lim) {
// Read from DynamoDB
RetryResult<ScanResult> retryResult = context.getClient().scanTable(tableName, null, segment,
context.getSplit().getTotalSegments(), lastEvaluatedKey, lim.items, context.getReporter());
ScanResult result = retryResult.result;
int retries = retryResult.retries;
double consumedCapacityUnits = 0.0;
if (result.getConsumedCapacity() != null) {
consumedCapacityUnits = result.getConsumedCapacity().getCapacityUnits();
}
return new PageResults<>(result.getItems(), result.getLastEvaluatedKey(), consumedCapacityUnits,
retries);
}
private static void findProductsForPriceLessThanZero() {
Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>();
expressionAttributeValues.put(":pr", new AttributeValue().withN("100"));
ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withFilterExpression("Price < :pr")
.withExpressionAttributeValues(expressionAttributeValues)
.withProjectionExpression("Id, Title, ProductCategory, Price");
ScanResult result = client.scan(scanRequest);
System.out.println("Scan of " + tableName + " for items with a price less than 100.");
for (Map<String, AttributeValue> item : result.getItems()) {
System.out.println("");
printItem(item);
}
}
/**
* Returns the list of usernames stored in the identity table.
*
* @return list of existing usernames in DynamoDB table
*/
public List<String> listUsers() {
List<String> users = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(USER_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
users.add(s);
}
return users;
}
/**
* @return the list of device ID (UID) stored in the identity table.
*/
public List<String> listDevices() {
List<String> devices = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
devices.add(s);
}
return devices;
}
public ScanResult scan(final ScanRequest request, final int permitsToConsume) throws BackendException {
setUserAgent(request);
ScanResult result;
timedReadThrottle(SCAN, request.getTableName(), permitsToConsume);
final Timer.Context apiTimerContext = getTimerContext(SCAN, request.getTableName());
try {
result = client.scan(request);
} catch (Exception e) {
throw processDynamoDbApiException(e, SCAN, request.getTableName());
} finally {
apiTimerContext.stop();
}
meterConsumedCapacity(SCAN, result.getConsumedCapacity());
measureItemCount(SCAN, request.getTableName(), result.getCount());
return result;
}
@SuppressFBWarnings(value = "IT_NO_SUCH_ELEMENT",
justification = "https://github.com/awslabs/dynamodb-janusgraph-storage-backend/issues/222")
@Override
public ScanResult next() {
final Scan backoff = new Scan(request, delegate, lastConsumedCapacity);
ScanResult result = null;
try {
result = backoff.runWithBackoff(); //this will be non-null or runWithBackoff throws
} catch (BackendException e) {
throw new BackendRuntimeException(e);
}
if (result.getConsumedCapacity() != null) {
lastConsumedCapacity = result.getConsumedCapacity().getCapacityUnits().intValue();
}
if (result.getLastEvaluatedKey() != null && !result.getLastEvaluatedKey().isEmpty()) {
hasNext = true;
request.setExclusiveStartKey(result.getLastEvaluatedKey());
} else {
hasNext = false;
}
return result;
}
/**
* begins a scan with an exponential back off if throttled.
*/
public ScanResult runWithBackoff() {
ScanResult result = null;
boolean interrupted = false;
try {
do {
try {
result = client.scan(request);
} catch (Exception e) {
try {
Thread.sleep(exponentialBackoffTime);
} catch (InterruptedException ie) {
interrupted = true;
} finally {
exponentialBackoffTime *= 2;
}
continue;
}
} while (result == null);
return result;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
private void dumpTables() {
for (String table : client.listTables().getTableNames()) {
ScanResult scanResult;
Map<String, AttributeValue> lastKey = null;
do {
scanResult = client.scan(new ScanRequest().withTableName(table).withExclusiveStartKey(lastKey));
lastKey = scanResult.getLastEvaluatedKey();
for (Map<String, AttributeValue> map : scanResult.getItems()) {
for (Map.Entry<String, AttributeValue> item : map.entrySet()) {
System.out.print("item.put(\"");
System.out.print(item.getKey());
System.out.print("\", b642Av(\"");
System.out.print(Base64.encodeAsString(AttributeValueMarshaller.marshall(item.getValue()).array()));
System.out.println("\"));");
}
System.out.print("ddb.putItem(new PutItemRequest(\"");
System.out.print(table);
System.out.println("\", item));");
System.out.println("item.clear();");
System.out.println();
}
} while (lastKey != null);
}
}
public Iterator<GeoWaveRow> getRowsFromDataIndex(final short adapterId, final String typeName) {
final List<GeoWaveRow> resultList = new ArrayList<>();
// fill result list
ScanResult result =
getResults(
typeName + "_" + getQualifiedTableName(DataIndexUtils.DATA_ID_INDEX.getName()),
adapterId,
resultList,
null);
while ((result.getLastEvaluatedKey() != null) && !result.getLastEvaluatedKey().isEmpty()) {
result =
getResults(
typeName + "_" + getQualifiedTableName(DataIndexUtils.DATA_ID_INDEX.getName()),
adapterId,
resultList,
result.getLastEvaluatedKey());
}
return resultList.iterator();
}
private ScanResult getResults(
final String tableName,
final short adapterId,
final List<GeoWaveRow> resultList,
final Map<String, AttributeValue> lastEvaluatedKey) {
final ScanRequest request = new ScanRequest(tableName);
if ((lastEvaluatedKey != null) && !lastEvaluatedKey.isEmpty()) {
request.setExclusiveStartKey(lastEvaluatedKey);
}
final ScanResult result = client.scan(request);
result.getItems().forEach(objMap -> {
final byte[] dataId = objMap.get(DynamoDBRow.GW_PARTITION_ID_KEY).getB().array();
final AttributeValue valueAttr = objMap.get(DynamoDBRow.GW_VALUE_KEY);
final byte[] value = valueAttr == null ? null : valueAttr.getB().array();
final AttributeValue visAttr = objMap.get(DynamoDBRow.GW_VISIBILITY_KEY);
final byte[] vis = visAttr == null ? new byte[0] : visAttr.getB().array();
resultList.add(DataIndexUtils.deserializeDataIndexRow(dataId, adapterId, value, vis));
});
return result;
}
/**
* @return the list of device ID (UID) stored in the identity table.
*/
public List<String> listDevices() {
List<String> devices = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
devices.add(s);
}
return devices;
}
/**
* Returns the list of usernames stored in the identity table.
*
* @return list of existing usernames in DynamoDB table
*/
public List<String> listUsers() {
List<String> users = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(USER_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
users.add(s);
}
return users;
}
/**
* @return the list of device ID (UID) stored in the identity table.
*/
public List<String> listDevices() {
List<String> devices = new ArrayList<String>(1000);
ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));
for (Map<String, AttributeValue> item : result.getItems()) {
String s = "";
for (Entry<String, AttributeValue> entry : item.entrySet()) {
s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
}
devices.add(s);
}
return devices;
}
private static void findProductsForPriceLessThanZero() {
Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>();
expressionAttributeValues.put(":pr", new AttributeValue().withN("100"));
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withFilterExpression("Price < :pr")
.withExpressionAttributeValues(expressionAttributeValues)
.withProjectionExpression("Id, Title, ProductCategory, Price");
ScanResult result = client.scan(scanRequest);
System.out.println("Scan of " + tableName + " for items with a price less than 100.");
for (Map<String, AttributeValue> item : result.getItems()) {
System.out.println("");
printItem(item);
}
}
/**
* Derives an Arrow {@link Schema} for the given table by performing a small table scan and mapping the returned
* attribute values' types to Arrow types. If the table is empty, only attributes found in the table's metadata
* are added to the return schema.
*
* @param tableName the table to derive a schema for
* @param invoker the ThrottlingInvoker to call DDB with
* @param ddbClient the DDB client to use
* @return the table's derived schema
*/
public static Schema peekTableForSchema(String tableName, ThrottlingInvoker invoker, AmazonDynamoDB ddbClient)
throws TimeoutException
{
ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withLimit(SCHEMA_INFERENCE_NUM_RECORDS);
ScanResult scanResult = invoker.invoke(() -> ddbClient.scan(scanRequest));
List<Map<String, AttributeValue>> items = scanResult.getItems();
Set<String> discoveredColumns = new HashSet<>();
SchemaBuilder schemaBuilder = new SchemaBuilder();
if (!items.isEmpty()) {
for (Map<String, AttributeValue> item : items) {
for (Map.Entry<String, AttributeValue> column : item.entrySet()) {
if (!discoveredColumns.contains(column.getKey())) {
Field field = DDBTypeUtils.inferArrowField(column.getKey(), ItemUtils.toSimpleValue(column.getValue()));
if (field != null) {
schemaBuilder.addField(field);
discoveredColumns.add(column.getKey());
}
}
}
}
}
else {
// there's no items, so use any attributes defined in the table metadata
DynamoDBTable table = getTable(tableName, invoker, ddbClient);
for (AttributeDefinition attributeDefinition : table.getKnownAttributeDefinitions()) {
schemaBuilder.addField(DDBTypeUtils.getArrowFieldFromDDBType(attributeDefinition.getAttributeName(), attributeDefinition.getAttributeType()));
}
}
return schemaBuilder.build();
}
@Override
public final Response scan(final Request request) {
final String projectionExpression = request.getPartitionKey() + ", " + request.getSortKey();
ScanRequest scanRequest = new ScanRequest()
.withTableName(request.getTableName())
.withProjectionExpression(projectionExpression);
StringBuilder filterExpressionBuilder;
Map<String, AttributeValue> expressionAttributeValues;
if (request.getFilterData() != null) {
filterExpressionBuilder = new StringBuilder();
expressionAttributeValues = new HashMap<>();
processFilterData(request, filterExpressionBuilder, expressionAttributeValues);
// Add to ScanRequest.
scanRequest.withFilterExpression(filterExpressionBuilder.toString());
scanRequest.withExpressionAttributeValues(expressionAttributeValues);
}
final ScanResult scanResult = dynamoDBClient.scan(scanRequest);
final StringBuilder response = new StringBuilder();
response.append("PK of items read with scan (V2): ");
for (Map<String, AttributeValue> item : scanResult.getItems()) {
response.append(prepareKeyStr(item, request));
}
return new Response(response.toString(), null);
}
private static int getCharactersCount() {
ScanRequest scanRequest = new ScanRequest()
.withTableName("starwars");
ScanResult result = getClient().scan(scanRequest);
System.out.println("Total number of characters found: " + result.getCount());
// scanning only returns 1MB of data
// need to aggressively scan and return an accurate count
return result.getCount();
}
TestItem(TableSchema<?> schema,
ScanResponse v2Response,
Class<?> v1BeanClass,
ScanResult v1Response) {
this.schema = schema;
this.v2Response = v2Response;
this.v1BeanClass = v1BeanClass;
this.v1Response = v1Response;
}
@Test
public void testKeySet() throws Exception {
ScanRequest request = new ScanRequest().withConsistentRead(true).withTableName(tableName);
ScanResult result = constructScanResult();
when(mockDynamoDBClient.scan(request)).thenReturn(result);
// Call the KeySet method and assert the expected secret identifiers are returned.
Set<SecretIdentifier> keys = dynamoDB.keySet();
assertEquals(keys.size(), 2);
assertTrue(keys.contains(new SecretIdentifier(SECRET_NAME)));
assertTrue(keys.contains(new SecretIdentifier(SECRET2_NAME)));
verify(mockDynamoDBClient, times(1)).scan(request);
}
@Test
public void testKeySetEmpty() throws Exception {
ScanRequest request = new ScanRequest().withConsistentRead(true).withTableName(tableName);
ScanResult result = new ScanResult().withCount(0).withItems(new ArrayList<>());
when(mockDynamoDBClient.scan(request)).thenReturn(result);
// Call the KeySet method and check that is it empty.
Set<SecretIdentifier> keySet = dynamoDB.keySet();
assertTrue(keySet.isEmpty());
verify(mockDynamoDBClient, times(1)).scan(request);
}
@Test
public void fetchPageReturnsZeroConsumedCapacityWhenResultsConsumedCapacityIsNull() {
RetryResult stubbedResult = new RetryResult<>(new ScanResult().withConsumedCapacity(null)
.withItems(new HashMap<String, AttributeValue>()), 0);
stubScanTableWith(stubbedResult);
when(context.getClient()).thenReturn(client);
when(context.getConf()).thenReturn(new JobConf());
when(context.getSplit()).thenReturn(new DynamoDBSegmentsSplit());
ScanReadManager readManager = Mockito.mock(ScanReadManager.class);
ScanRecordReadRequest readRequest = new ScanRecordReadRequest(readManager, context, 0, null);
PageResults<Map<String, AttributeValue>> pageResults =
readRequest.fetchPage(new RequestLimit(0, 0));
assertEquals(0.0, pageResults.consumedRcu, 0.0);
}
private void stubScanTableWith(RetryResult<ScanResult> scanResultRetryResult) {
when(client.scanTable(
anyString(),
any(DynamoDBQueryFilter.class),
anyInt(),
anyInt(),
any(Map.class),
anyLong(),
any(Reporter.class))
).thenReturn(scanResultRetryResult);
}
private ScanResult getHashKeyItems(String[] hashKeys, String type) {
List<Map<String, AttributeValue>> items = new ArrayList<>();
for (String key : hashKeys) {
Map<String, AttributeValue> item = new HashMap<>();
if (type.equals("S")) {
item.put("hashKey", new AttributeValue(key));
} else {
item.put("hashKey", new AttributeValue().withN(key));
}
items.add(item);
}
return new ScanResult().withScannedCount(items.size()).withItems(items).withConsumedCapacity
(new ConsumedCapacity().withCapacityUnits(1d));
}
@Override
public ScanResult scan(
String tableName,
List<String> attributesToGet,
Map<String, Condition> scanFilter) throws AmazonServiceException, AmazonClientException {
ScanRequest request = new ScanRequest()
.withTableName(tableName)
.withAttributesToGet(attributesToGet)
.withScanFilter(scanFilter);
return scan(request);
}
@Override
public void run() {
System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments
+ " segments " + itemLimit + " items at a time...");
Map<String, AttributeValue> exclusiveStartKey = null;
int totalScannedItemCount = 0;
int totalScanRequestCount = 0;
try {
while (true) {
ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withLimit(itemLimit)
.withExclusiveStartKey(exclusiveStartKey).withTotalSegments(totalSegments).withSegment(segment);
ScanResult result = client.scan(scanRequest);
totalScanRequestCount++;
totalScannedItemCount += result.getScannedCount();
// print items returned from scan request
processScanResult(segment, result);
exclusiveStartKey = result.getLastEvaluatedKey();
if (exclusiveStartKey == null) {
break;
}
}
}
catch (AmazonServiceException ase) {
System.err.println(ase.getMessage());
}
finally {
System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of "
+ totalSegments + " of " + tableName + " with " + totalScanRequestCount + " scan requests");
}
}
@ProcessElement
public void processElement(@Element Read<T> spec, OutputReceiver<T> out) {
AmazonDynamoDB client = spec.getAwsClientsProvider().createDynamoDB();
ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
scanRequest.setSegment(spec.getSegmentId());
ScanResult scanResult = client.scan(scanRequest);
out.output(spec.getScanResultMapperFn().apply(scanResult));
}
@Override
public List<Map<String, AttributeValue>> apply(@Nullable ScanResult scanResult) {
if (scanResult == null) {
return Collections.emptyList();
}
return scanResult.getItems();
}
static List<Map<String, AttributeValue>> generateTestData(String tableName, int numOfItems) {
BatchWriteItemRequest batchWriteItemRequest =
generateBatchWriteItemRequest(tableName, numOfItems);
dynamoDBClient.batchWriteItem(batchWriteItemRequest);
ScanResult scanResult = dynamoDBClient.scan(new ScanRequest().withTableName(tableName));
List<Map<String, AttributeValue>> items = scanResult.getItems();
Assert.assertEquals(numOfItems, items.size());
return items;
}
@Override
public String handleRequest(Book request, Context context) {
ScanRequest scanRequest = new ScanRequest().withTableName("Books");
ScanResult result = DynamoDBUtil.getClient().scan(scanRequest);
System.out.println("-- books listing start --");
for (Map<String, AttributeValue> item : result.getItems()){
System.out.println(item);
}
System.out.println("-- books listing end --");
return result.getItems().toString();
}