下面列出了怎么用com.amazonaws.services.dynamodbv2.model.ConsumedCapacity的API类实例代码及写法,或者点击链接到github查看源代码。
private ScanResult getHashNumberRangeKeyItems(String[] hashKeys, String hashType) {
List<Map<String, AttributeValue>> items = new ArrayList<>();
for (String key : hashKeys) {
for (Integer i = 0; i < NUM_RANGE_KEYS_PER_HASH_KEY; i++) {
Map<String, AttributeValue> item = new HashMap<>();
if (hashType.equals("S")) {
item.put("hashKey", new AttributeValue(key));
} else {
item.put("hashKey", new AttributeValue().withN(key));
}
item.put("rangeKey", new AttributeValue().withN("0" + i.toString()));
items.add(item);
}
}
return new ScanResult().withScannedCount(items.size()).withItems(items).withConsumedCapacity
(new ConsumedCapacity().withCapacityUnits(1d));
}
@Override
public QueryResultWrapper next() throws BackendException {
final Query backoff = new ExponentialBackoff.Query(request, delegate, permitsToConsume);
final QueryResult result = backoff.runWithBackoff();
final ConsumedCapacity consumedCapacity = result.getConsumedCapacity();
if (null != consumedCapacity) {
permitsToConsume = Math.max((int) (consumedCapacity.getCapacityUnits() - 1.0), 1);
totalCapacityUnits += consumedCapacity.getCapacityUnits();
}
if (result.getLastEvaluatedKey() != null && !result.getLastEvaluatedKey().isEmpty()) {
request.setExclusiveStartKey(result.getLastEvaluatedKey());
} else {
markComplete();
}
// a update returned count
returnedCount += result.getCount();
// b update scanned count
scannedCount += result.getScannedCount();
// c add scanned finalItemList
finalItemList.addAll(result.getItems());
return new QueryResultWrapper(titanKey, result);
}
@Override
public void write(K key, V value) throws IOException {
if (value == null) {
throw new RuntimeException("Null record encountered. At least the key columns must be "
+ "specified.");
}
verifyInterval();
if (progressable != null) {
progressable.progress();
}
DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value);
BatchWriteItemResult result = client.putBatch(tableName, item.getItem(),
permissibleWritesPerSecond - writesPerSecond, reporter, deletionMode);
batchSize++;
totalItemsWritten++;
if (result != null) {
if (result.getConsumedCapacity() != null) {
for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
double consumedUnits = consumedCapacity.getCapacityUnits();
totalIOPSConsumed += consumedUnits;
}
}
int unprocessedItems = 0;
for (List<WriteRequest> requests : result.getUnprocessedItems().values()) {
unprocessedItems += requests.size();
}
writesPerSecond += batchSize - unprocessedItems;
batchSize = unprocessedItems;
}
}
private ScanResult getHashKeyItems(String[] hashKeys, String type) {
List<Map<String, AttributeValue>> items = new ArrayList<>();
for (String key : hashKeys) {
Map<String, AttributeValue> item = new HashMap<>();
if (type.equals("S")) {
item.put("hashKey", new AttributeValue(key));
} else {
item.put("hashKey", new AttributeValue().withN(key));
}
items.add(item);
}
return new ScanResult().withScannedCount(items.size()).withItems(items).withConsumedCapacity
(new ConsumedCapacity().withCapacityUnits(1d));
}
@Override
protected QueryResultWrapper getMergedPages() {
final QueryResult mergedDynamoResult = new QueryResult().withItems(getFinalItemList())
.withCount(returnedCount)
.withScannedCount(scannedCount)
.withConsumedCapacity(new ConsumedCapacity()
.withTableName(request.getTableName())
.withCapacityUnits(totalCapacityUnits));
return new QueryResultWrapper(titanKey, mergedDynamoResult);
}
@Override
public SegmentedScanResult call() {
ScanResult result = null;
result = runWithBackoff();
final ConsumedCapacity cc = result.getConsumedCapacity();
if (cc != null && cc.getCapacityUnits() != null) {
lastConsumedCapacity = result.getConsumedCapacity()
.getCapacityUnits().intValue();
} else if (result.getScannedCount() != null && result.getCount() != null) {
final boolean isConsistent = request.getConsistentRead();
int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE
: BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE;
lastConsumedCapacity = (result.getScannedCount() / (int) Math.max(1.0, result.getCount()))
* (ItemSizeCalculator.calculateScanResultSizeInBytes(result) / itemSize);
}
if (result.getLastEvaluatedKey() != null
&& !result.getLastEvaluatedKey().isEmpty()) {
hasNext = true;
request.setExclusiveStartKey(result.getLastEvaluatedKey());
} else {
hasNext = false;
}
if (lastConsumedCapacity > 0) {
rateLimiter.acquire(lastConsumedCapacity);
}
return new SegmentedScanResult(result, request.getSegment());
}
/**
* Batch writes the write request to the DynamoDB endpoint and THEN acquires
* permits equal to the consumed capacity of the write.
*/
@Override
public Void call() {
List<ConsumedCapacity> batchResult = runWithBackoff(batch);
Iterator<ConsumedCapacity> it = batchResult.iterator();
int consumedCapacity = 0;
while (it.hasNext()) {
consumedCapacity += it.next().getCapacityUnits().intValue();
}
rateLimiter.acquire(consumedCapacity);
return null;
}
/**
* Writes to DynamoDBTable using an exponential backoff. If the
* batchWriteItem returns unprocessed items then it will exponentially
* backoff and retry the unprocessed items.
*/
public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) {
BatchWriteItemResult writeItemResult = null;
List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>();
Map<String, List<WriteRequest>> unprocessedItems = null;
boolean interrupted = false;
try {
do {
writeItemResult = client.batchWriteItem(req);
unprocessedItems = writeItemResult.getUnprocessedItems();
consumedCapacities
.addAll(writeItemResult.getConsumedCapacity());
if (unprocessedItems != null) {
req.setRequestItems(unprocessedItems);
try {
Thread.sleep(exponentialBackoffTime);
} catch (InterruptedException ie) {
interrupted = true;
} finally {
exponentialBackoffTime *= 2;
if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) {
exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME;
}
}
}
} while (unprocessedItems != null && unprocessedItems.get(tableName) != null);
return consumedCapacities;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
@Test
public void test_batchWriteItem_WithAllParameters() throws Exception {
createTable();
String TEST_ATTRIBUTE_2 = "Attribute2";
String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2";
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>();
item1.put(TEST_ATTRIBUTE, new AttributeValue()
.withS(TEST_ATTRIBUTE_VALUE));
WriteRequest writeRequest1 = new WriteRequest()
.withPutRequest(new PutRequest()
.withItem(item1));
writeRequests.add(writeRequest1);
Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>();
item2.put(TEST_ATTRIBUTE_2, new AttributeValue()
.withS(TEST_ATTRIBUTE_VALUE_2));
WriteRequest writeRequest2 = new WriteRequest()
.withPutRequest(new PutRequest()
.withItem(item2));
writeRequests.add(writeRequest2);
requestItems.put(TEST_TABLE_NAME, writeRequests);
BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems);
List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity();
assertThat(consumedCapacities.size(), equalTo(writeRequests.size()));
}
private static void writeMultipleItemsBatchWrite() {
try {
// Create a map for the requests in the batch
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
// Create a PutRequest for a new Forum item
Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
forumItem.put("Threads", new AttributeValue().withN("0"));
List<WriteRequest> forumList = new ArrayList<WriteRequest>();
forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
requestItems.put(table1Name, forumList);
// Create a PutRequest for a new Thread item
Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
threadItem.put("Message", new AttributeValue().withS("ElastiCache Thread 1 message"));
threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));
List<WriteRequest> threadList = new ArrayList<WriteRequest>();
threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
// Create a DeleteRequest for a Thread item
Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));
threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
requestItems.put(table2Name, threadList);
BatchWriteItemResult result;
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
do {
System.out.println("Making the request.");
batchWriteItemRequest.withRequestItems(requestItems);
result = client.batchWriteItem(batchWriteItemRequest);
// Print consumed capacity units
for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
String tableName = consumedCapacity.getTableName();
Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
}
// Check for unprocessed keys which could happen if you exceed
// provisioned throughput
System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
requestItems = result.getUnprocessedItems();
} while (result.getUnprocessedItems().size() > 0);
}
catch (AmazonServiceException ase) {
System.err.println("Failed to retrieve items: ");
ase.printStackTrace(System.err);
}
}
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchRequest) throws BackendException {
int count = 0;
for (Entry<String, List<WriteRequest>> entry : batchRequest.getRequestItems().entrySet()) {
final String tableName = entry.getKey();
final List<WriteRequest> requests = entry.getValue();
count += requests.size();
if (count > BATCH_WRITE_MAX_NUMBER_OF_ITEMS) {
throw new IllegalArgumentException("cant have more than 25 requests in a batchwrite");
}
for (final WriteRequest request : requests) {
if ((request.getPutRequest() != null) == (request.getDeleteRequest() != null)) {
throw new IllegalArgumentException("Exactly one of PutRequest or DeleteRequest must be set in each WriteRequest in a batch write operation");
}
final int wcu;
final String apiName;
if (request.getPutRequest() != null) {
apiName = PUT_ITEM;
final int bytes = calculateItemSizeInBytes(request.getPutRequest().getItem());
wcu = computeWcu(bytes);
} else { //deleterequest
apiName = DELETE_ITEM;
wcu = estimateCapacityUnits(apiName, tableName);
}
timedWriteThrottle(apiName, tableName, wcu);
}
}
BatchWriteItemResult result;
setUserAgent(batchRequest);
final Timer.Context apiTimerContext = getTimerContext(BATCH_WRITE_ITEM, null /*tableName*/);
try {
result = client.batchWriteItem(batchRequest);
} catch (Exception e) {
throw processDynamoDbApiException(e, BATCH_WRITE_ITEM, null /*tableName*/);
} finally {
apiTimerContext.stop();
}
if (result.getConsumedCapacity() != null) {
for (ConsumedCapacity ccu : result.getConsumedCapacity()) {
meterConsumedCapacity(BATCH_WRITE_ITEM, ccu);
}
}
return result;
}
private void meterConsumedCapacity(final String apiName, final ConsumedCapacity ccu) {
if (ccu != null) {
getConsumedCapacityMeter(apiName, ccu.getTableName()).mark(Math.round(ccu.getCapacityUnits()));
}
}
private static void writeMultipleItemsBatchWrite() {
try {
// Create a map for the requests in the batch
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
// Create a PutRequest for a new Forum item
Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
forumItem.put("Threads", new AttributeValue().withN("0"));
List<WriteRequest> forumList = new ArrayList<WriteRequest>();
forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
requestItems.put(table1Name, forumList);
// Create a PutRequest for a new Thread item
Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message"));
threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));
List<WriteRequest> threadList = new ArrayList<WriteRequest>();
threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
// Create a DeleteRequest for a Thread item
Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));
threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
requestItems.put(table2Name, threadList);
BatchWriteItemResult result;
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
do {
System.out.println("Making the request.");
batchWriteItemRequest.withRequestItems(requestItems);
result = client.batchWriteItem(batchWriteItemRequest);
// Print consumed capacity units
for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
String tableName = consumedCapacity.getTableName();
Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
}
// Check for unprocessed keys which could happen if you exceed provisioned throughput
System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
requestItems = result.getUnprocessedItems();
} while (result.getUnprocessedItems().size() > 0);
} catch (AmazonServiceException ase) {
System.err.println("Failed to retrieve items: ");
ase.printStackTrace(System.err);
}
}