下面列出了怎么用com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Helper method to handle unprocessed items items
* @param session process session
* @param keysToFlowFileMap map of flow db primary key to flow file
* @param table dynamodb table
* @param hashKeyName the hash key name
* @param hashKeyValueType the hash key value
* @param rangeKeyName the range key name
* @param rangeKeyValueType range key value
* @param outcome the write outcome
*/
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
BatchWriteItemResult result = outcome.getBatchWriteItemResult();
// Handle unprocessed items
List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
for ( WriteRequest request : unprocessedItems) {
Map<String,AttributeValue> item = getRequestItem(request);
Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
}
private void commitPartial(List<WriteRequest> list) {
Timer t = new Timer();
Map<String, List<WriteRequest>> map = new HashMap<>();
map.put(getTenant().getName(), list);
BatchWriteItemResult result = m_client.batchWriteItem(new BatchWriteItemRequest(map));
int retry = 0;
while(result.getUnprocessedItems().size() > 0) {
if(retry == RETRY_SLEEPS.length) throw new RuntimeException("All retries failed");
m_logger.debug("Committing {} unprocessed items, retry: {}", result.getUnprocessedItems().size(), retry + 1);
try {
Thread.sleep(RETRY_SLEEPS[retry++]);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
result = m_client.batchWriteItem(new BatchWriteItemRequest(result.getUnprocessedItems()));
}
m_logger.debug("Committed {} writes in {}", list.size(), t);
list.clear();
}
/**
* Helper method to handle unprocessed items items
* @param session process session
* @param keysToFlowFileMap map of flow db primary key to flow file
* @param table dynamodb table
* @param hashKeyName the hash key name
* @param hashKeyValueType the hash key value
* @param rangeKeyName the range key name
* @param rangeKeyValueType range key value
* @param outcome the write outcome
*/
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
BatchWriteItemResult result = outcome.getBatchWriteItemResult();
// Handle unprocessed items
List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
for ( WriteRequest request : unprocessedItems) {
Map<String,AttributeValue> item = getRequestItem(request);
Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
}
@Override
public void write(K key, V value) throws IOException {
if (value == null) {
throw new RuntimeException("Null record encountered. At least the key columns must be "
+ "specified.");
}
verifyInterval();
if (progressable != null) {
progressable.progress();
}
DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value);
BatchWriteItemResult result = client.putBatch(tableName, item.getItem(),
permissibleWritesPerSecond - writesPerSecond, reporter, deletionMode);
batchSize++;
totalItemsWritten++;
if (result != null) {
if (result.getConsumedCapacity() != null) {
for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
double consumedUnits = consumedCapacity.getCapacityUnits();
totalIOPSConsumed += consumedUnits;
}
}
int unprocessedItems = 0;
for (List<WriteRequest> requests : result.getUnprocessedItems().values()) {
unprocessedItems += requests.size();
}
writesPerSecond += batchSize - unprocessedItems;
batchSize = unprocessedItems;
}
}
/**
* Writes to DynamoDBTable using an exponential backoff. If the
* batchWriteItem returns unprocessed items then it will exponentially
* backoff and retry the unprocessed items.
*/
public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) {
BatchWriteItemResult writeItemResult = null;
List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>();
Map<String, List<WriteRequest>> unprocessedItems = null;
boolean interrupted = false;
try {
do {
writeItemResult = client.batchWriteItem(req);
unprocessedItems = writeItemResult.getUnprocessedItems();
consumedCapacities
.addAll(writeItemResult.getConsumedCapacity());
if (unprocessedItems != null) {
req.setRequestItems(unprocessedItems);
try {
Thread.sleep(exponentialBackoffTime);
} catch (InterruptedException ie) {
interrupted = true;
} finally {
exponentialBackoffTime *= 2;
if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) {
exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME;
}
}
}
} while (unprocessedItems != null && unprocessedItems.get(tableName) != null);
return consumedCapacities;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
@Test
public void test_batchWriteItem_WithAllParameters() throws Exception {
createTable();
String TEST_ATTRIBUTE_2 = "Attribute2";
String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2";
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>();
item1.put(TEST_ATTRIBUTE, new AttributeValue()
.withS(TEST_ATTRIBUTE_VALUE));
WriteRequest writeRequest1 = new WriteRequest()
.withPutRequest(new PutRequest()
.withItem(item1));
writeRequests.add(writeRequest1);
Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>();
item2.put(TEST_ATTRIBUTE_2, new AttributeValue()
.withS(TEST_ATTRIBUTE_VALUE_2));
WriteRequest writeRequest2 = new WriteRequest()
.withPutRequest(new PutRequest()
.withItem(item2));
writeRequests.add(writeRequest2);
requestItems.put(TEST_TABLE_NAME, writeRequests);
BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems);
List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity();
assertThat(consumedCapacities.size(), equalTo(writeRequests.size()));
}
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) {
BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest();
List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
for (PutPointRequest putPointRequest : putPointRequests) {
long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint());
long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength());
String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint());
PutRequest putRequest = putPointRequest.getPutRequest();
AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey));
putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue);
putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue());
AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash));
putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue);
AttributeValue geoJsonValue = new AttributeValue().withS(geoJson);
putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue);
WriteRequest writeRequest = new WriteRequest(putRequest);
writeRequests.add(writeRequest);
}
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
requestItems.put(config.getTableName(), writeRequests);
batchItemRequest.setRequestItems(requestItems);
BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest);
BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult);
return batchWritePointResult;
}
private List<Map<String, AttributeValue>> unproccessedItems(BatchWriteItemResult result,
Map<WriteRequest, Map<String, AttributeValue>> requestMap) {
Collection<List<WriteRequest>> items = result.getUnprocessedItems().values();
List<Map<String, AttributeValue>> unprocessed = new ArrayList<Map<String, AttributeValue>>();
// retrieve the unprocessed items
for (List<WriteRequest> list : items) {
for (WriteRequest request : list) {
unprocessed.add(requestMap.get(request));
}
}
return unprocessed;
}
@Override
public BatchWriteItemResult batchWriteItem(BatchWriteItemRequest batchWriteItemRequest) {
throw new UnsupportedOperationException();
}
@Override
public BatchWriteItemResult batchWriteItem(Map<String, List<WriteRequest>> requestItems) {
throw new UnsupportedOperationException();
}
public BatchWriteItemResult putBatch(String tableName, Map<String, AttributeValue> item,
long maxItemsPerBatch, Reporter reporter, boolean deletionMode)
throws UnsupportedEncodingException {
int itemSizeBytes = DynamoDBUtil.getItemSizeBytes(item);
if (itemSizeBytes > maxItemByteSize) {
throw new RuntimeException("Cannot pass items with size greater than " + maxItemByteSize
+ ". Item with size of " + itemSizeBytes + " was given.");
}
maxItemsPerBatch = DynamoDBUtil.getBoundedBatchLimit(config, maxItemsPerBatch);
BatchWriteItemResult result = null;
if (writeBatchMap.containsKey(tableName)) {
boolean writeRequestsForTableAtLimit =
writeBatchMap.get(tableName).size() >= maxItemsPerBatch;
boolean totalSizeOfWriteBatchesOverLimit =
writeBatchMapSizeBytes + itemSizeBytes > maxBatchSize;
if (writeRequestsForTableAtLimit || totalSizeOfWriteBatchesOverLimit) {
result = writeBatch(reporter, itemSizeBytes);
}
}
// writeBatchMap could be cleared from writeBatch()
List<WriteRequest> writeBatchList;
if (!writeBatchMap.containsKey(tableName)) {
writeBatchList = new ArrayList<>((int) maxItemsPerBatch);
writeBatchMap.put(tableName, writeBatchList);
} else {
writeBatchList = writeBatchMap.get(tableName);
}
log.debug("BatchWriteItem deletionMode " + deletionMode);
if (deletionMode) {
writeBatchList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(item)));
} else {
writeBatchList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item)));
}
writeBatchMapSizeBytes += itemSizeBytes;
return result;
}
private static void writeMultipleItemsBatchWrite() {
try {
// Create a map for the requests in the batch
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
// Create a PutRequest for a new Forum item
Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
forumItem.put("Threads", new AttributeValue().withN("0"));
List<WriteRequest> forumList = new ArrayList<WriteRequest>();
forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
requestItems.put(table1Name, forumList);
// Create a PutRequest for a new Thread item
Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
threadItem.put("Message", new AttributeValue().withS("ElastiCache Thread 1 message"));
threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));
List<WriteRequest> threadList = new ArrayList<WriteRequest>();
threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
// Create a DeleteRequest for a Thread item
Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));
threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
requestItems.put(table2Name, threadList);
BatchWriteItemResult result;
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
do {
System.out.println("Making the request.");
batchWriteItemRequest.withRequestItems(requestItems);
result = client.batchWriteItem(batchWriteItemRequest);
// Print consumed capacity units
for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
String tableName = consumedCapacity.getTableName();
Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
}
// Check for unprocessed keys which could happen if you exceed
// provisioned throughput
System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
requestItems = result.getUnprocessedItems();
} while (result.getUnprocessedItems().size() > 0);
}
catch (AmazonServiceException ase) {
System.err.println("Failed to retrieve items: ");
ase.printStackTrace(System.err);
}
}
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchRequest) throws BackendException {
int count = 0;
for (Entry<String, List<WriteRequest>> entry : batchRequest.getRequestItems().entrySet()) {
final String tableName = entry.getKey();
final List<WriteRequest> requests = entry.getValue();
count += requests.size();
if (count > BATCH_WRITE_MAX_NUMBER_OF_ITEMS) {
throw new IllegalArgumentException("cant have more than 25 requests in a batchwrite");
}
for (final WriteRequest request : requests) {
if ((request.getPutRequest() != null) == (request.getDeleteRequest() != null)) {
throw new IllegalArgumentException("Exactly one of PutRequest or DeleteRequest must be set in each WriteRequest in a batch write operation");
}
final int wcu;
final String apiName;
if (request.getPutRequest() != null) {
apiName = PUT_ITEM;
final int bytes = calculateItemSizeInBytes(request.getPutRequest().getItem());
wcu = computeWcu(bytes);
} else { //deleterequest
apiName = DELETE_ITEM;
wcu = estimateCapacityUnits(apiName, tableName);
}
timedWriteThrottle(apiName, tableName, wcu);
}
}
BatchWriteItemResult result;
setUserAgent(batchRequest);
final Timer.Context apiTimerContext = getTimerContext(BATCH_WRITE_ITEM, null /*tableName*/);
try {
result = client.batchWriteItem(batchRequest);
} catch (Exception e) {
throw processDynamoDbApiException(e, BATCH_WRITE_ITEM, null /*tableName*/);
} finally {
apiTimerContext.stop();
}
if (result.getConsumedCapacity() != null) {
for (ConsumedCapacity ccu : result.getConsumedCapacity()) {
meterConsumedCapacity(BATCH_WRITE_ITEM, ccu);
}
}
return result;
}
private void writeBatch(final boolean async) {
final List<WriteRequest> batch;
if (batchedItems.size() <= NUM_ITEMS) {
batch = batchedItems;
} else {
batch = batchedItems.subList(0, NUM_ITEMS + 1);
}
final Map<String, List<WriteRequest>> writes = new HashMap<>();
writes.put(tableName, new ArrayList<>(batch));
if (async) {
/**
* To support asynchronous batch write a async handler is created Callbacks are provided for
* success and error. As there might be unprocessed items on failure, they are retried
* asynchronously Keep track of futures, so that they can be waited on during "flush"
*/
final BatchWriteItemRequest batchRequest = new BatchWriteItemRequest(writes);
final Future<BatchWriteItemResult> future =
client.batchWriteItemAsync(
batchRequest,
new AsyncHandler<BatchWriteItemRequest, BatchWriteItemResult>() {
@Override
public void onError(final Exception exception) {
LOGGER.warn(
"Unable to get response from Dynamo-Async Write " + exception.toString());
futureMap.remove(batchRequest);
return;
}
@Override
public void onSuccess(
final BatchWriteItemRequest request,
final BatchWriteItemResult result) {
retryAsync(result.getUnprocessedItems());
if (futureMap.remove(request) == null) {
LOGGER.warn(" Unable to delete BatchWriteRequest from futuresMap ");
}
}
});
futureMap.put(batchRequest, future);
} else {
final BatchWriteItemResult response =
client.batchWriteItem(new BatchWriteItemRequest(writes));
retry(response.getUnprocessedItems());
}
batch.clear();
}
private static void writeMultipleItemsBatchWrite() {
try {
// Create a map for the requests in the batch
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
// Create a PutRequest for a new Forum item
Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
forumItem.put("Threads", new AttributeValue().withN("0"));
List<WriteRequest> forumList = new ArrayList<WriteRequest>();
forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
requestItems.put(table1Name, forumList);
// Create a PutRequest for a new Thread item
Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message"));
threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));
List<WriteRequest> threadList = new ArrayList<WriteRequest>();
threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
// Create a DeleteRequest for a Thread item
Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));
threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
requestItems.put(table2Name, threadList);
BatchWriteItemResult result;
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
do {
System.out.println("Making the request.");
batchWriteItemRequest.withRequestItems(requestItems);
result = client.batchWriteItem(batchWriteItemRequest);
// Print consumed capacity units
for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
String tableName = consumedCapacity.getTableName();
Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
}
// Check for unprocessed keys which could happen if you exceed provisioned throughput
System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
requestItems = result.getUnprocessedItems();
} while (result.getUnprocessedItems().size() > 0);
} catch (AmazonServiceException ase) {
System.err.println("Failed to retrieve items: ");
ase.printStackTrace(System.err);
}
}
public BatchWritePointResult(BatchWriteItemResult batchWriteItemResult) {
this.batchWriteItemResult = batchWriteItemResult;
}
public BatchWriteItemResult getBatchWriteItemResult() {
return batchWriteItemResult;
}
@Override
public BatchWriteItemResult batchWriteItem(BatchWriteItemRequest arg0)
throws AmazonServiceException, AmazonClientException {
throw new UnsupportedOperationException("Use the underlying client instance instead");
}
@Override
public BatchWriteItemResult batchWriteItem(
Map<String, List<WriteRequest>> requestItems)
throws AmazonServiceException, AmazonClientException {
throw new UnsupportedOperationException("Use the underlying client instance instead");
}
@Override
public BatchWriteItemResult batchWriteItem(BatchWriteItemRequest request) throws AmazonServiceException, AmazonClientException {
return getBackend().batchWriteItem(request);
}
@Override
public BatchWriteItemResult batchWriteItem(Map<String, List<WriteRequest>> requestItems) throws AmazonServiceException, AmazonClientException {
return getBackend().batchWriteItem(requestItems);
}
@Override
public BatchWriteItemResult batchWriteItem(BatchWriteItemRequest arg0)
throws AmazonServiceException, AmazonClientException {
throw new UnsupportedOperationException("Use the underlying client instance instead");
}
@Override
public BatchWriteItemResult batchWriteItem(
Map<String, List<WriteRequest>> requestItems)
throws AmazonServiceException, AmazonClientException {
throw new UnsupportedOperationException("Use the underlying client instance instead");
}