下面列出了怎么用com.amazonaws.services.dynamodbv2.model.PutRequest的API类实例代码及写法,或者点击链接到github查看源代码。
private void insert(ValueSource valueSource, Schema schema, Object value, PutRequest put) {
final AttributeValue attributeValue;
try {
attributeValue = schema == null
? AttributeValueConverter.toAttributeValueSchemaless(value)
: AttributeValueConverter.toAttributeValue(schema, value);
} catch (DataException e) {
log.error("Failed to convert record with schema={} value={}", schema, value, e);
throw e;
}
final String topAttributeName = valueSource.topAttributeName(config);
if (!topAttributeName.isEmpty()) {
put.addItemEntry(topAttributeName, attributeValue);
} else if (attributeValue.getM() != null) {
put.setItem(attributeValue.getM());
} else {
throw new ConnectException("No top attribute name configured for " + valueSource + ", and it could not be converted to Map: " + attributeValue);
}
}
@Test
public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
PutRequest put = new PutRequest();
put.addItemEntry("hashS", new AttributeValue("h1"));
put.addItemEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(put);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2");
putRunner.enqueue("{\"hello\":\"world\"}".getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
}
private PutRequest toPutRequest(SinkRecord record) {
final PutRequest put = new PutRequest();
if (!config.ignoreRecordValue) {
insert(ValueSource.RECORD_VALUE, record.valueSchema(), record.value(), put);
}
if (!config.ignoreRecordKey) {
insert(ValueSource.RECORD_KEY, record.keySchema(), record.key(), put);
}
if (config.kafkaCoordinateNames != null) {
put.addItemEntry(config.kafkaCoordinateNames.topic, new AttributeValue().withS(record.topic()));
put.addItemEntry(config.kafkaCoordinateNames.partition, new AttributeValue().withN(String.valueOf(record.kafkaPartition())));
put.addItemEntry(config.kafkaCoordinateNames.offset, new AttributeValue().withN(String.valueOf(record.kafkaOffset())));
}
return put;
}
/**
* Splits up a ScanResult into a list of BatchWriteItemRequests of size 25
* items or less each.
*/
public static List<BatchWriteItemRequest> splitResultIntoBatches(
ScanResult result, String tableName) {
List<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>();
Iterator<Map<String, AttributeValue>> it = result.getItems().iterator();
BatchWriteItemRequest req = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
List<WriteRequest> writeRequests = new LinkedList<WriteRequest>();
int i = 0;
while (it.hasNext()) {
PutRequest put = new PutRequest(it.next());
writeRequests.add(new WriteRequest(put));
i++;
if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) {
req.addRequestItemsEntry(tableName, writeRequests);
batches.add(req);
req = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
writeRequests = new LinkedList<WriteRequest>();
i = 0;
}
}
if (i > 0) {
req.addRequestItemsEntry(tableName, writeRequests);
batches.add(req);
}
return batches;
}
@Test
public void test_batchWriteItem_WithAllParameters() throws Exception {
createTable();
String TEST_ATTRIBUTE_2 = "Attribute2";
String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2";
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>();
item1.put(TEST_ATTRIBUTE, new AttributeValue()
.withS(TEST_ATTRIBUTE_VALUE));
WriteRequest writeRequest1 = new WriteRequest()
.withPutRequest(new PutRequest()
.withItem(item1));
writeRequests.add(writeRequest1);
Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>();
item2.put(TEST_ATTRIBUTE_2, new AttributeValue()
.withS(TEST_ATTRIBUTE_VALUE_2));
WriteRequest writeRequest2 = new WriteRequest()
.withPutRequest(new PutRequest()
.withItem(item2));
writeRequests.add(writeRequest2);
requestItems.put(TEST_TABLE_NAME, writeRequests);
BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems);
List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity();
assertThat(consumedCapacities.size(), equalTo(writeRequests.size()));
}
@Test
public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
PutRequest put = new PutRequest();
put.addItemEntry("hashS", new AttributeValue("h1"));
put.addItemEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(put);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2");
putRunner.enqueue("{\"hello\":\"world\"}".getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
}
public PutPointRequest(GeoPoint geoPoint, AttributeValue rangeKeyValue) {
putItemRequest = new PutItemRequest();
putItemRequest.setItem(new HashMap<String, AttributeValue>());
putRequest = new PutRequest();
putRequest.setItem(new HashMap<String, AttributeValue>());
this.geoPoint = geoPoint;
this.rangeKeyValue = rangeKeyValue;
}
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) {
BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest();
List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
for (PutPointRequest putPointRequest : putPointRequests) {
long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint());
long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength());
String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint());
PutRequest putRequest = putPointRequest.getPutRequest();
AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey));
putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue);
putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue());
AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash));
putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue);
AttributeValue geoJsonValue = new AttributeValue().withS(geoJson);
putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue);
WriteRequest writeRequest = new WriteRequest(putRequest);
writeRequests.add(writeRequest);
}
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
requestItems.put(config.getTableName(), writeRequests);
batchItemRequest.setRequestItems(requestItems);
BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest);
BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult);
return batchWritePointResult;
}
@Override
public List<Map<String, AttributeValue>> emit(final UnmodifiableBuffer<Map<String, AttributeValue>> buffer)
throws IOException {
// Map of WriteRequests to records for reference
Map<WriteRequest, Map<String, AttributeValue>> requestMap =
new HashMap<WriteRequest, Map<String, AttributeValue>>();
List<Map<String, AttributeValue>> unproc = new ArrayList<Map<String, AttributeValue>>();
// Build a batch request with a record list
List<WriteRequest> rList = new ArrayList<WriteRequest>();
List<Map<String, AttributeValue>> resultList;
// Amazon DynamoDB only allows one operation per item in a bulk insertion (no duplicate items)
Set<Map<String, AttributeValue>> uniqueItems = uniqueItems(buffer.getRecords());
for (Map<String, AttributeValue> item : uniqueItems) {
WriteRequest wr = new WriteRequest().withPutRequest(new PutRequest().withItem(item));
// add to the map
requestMap.put(wr, item);
// add to the list of requests
rList.add(wr);
// Max of sixteen not to exceed maximum request size
if (rList.size() == 16) {
resultList = performBatchRequest(rList, requestMap);
unproc.addAll(resultList);
rList.clear();
}
}
resultList = performBatchRequest(rList, requestMap);
unproc.addAll(resultList);
LOG.info("Successfully emitted " + (buffer.getRecords().size() - unproc.size()) + " records into DynamoDB.");
return unproc;
}
public BatchWriteItemResult putBatch(String tableName, Map<String, AttributeValue> item,
long maxItemsPerBatch, Reporter reporter, boolean deletionMode)
throws UnsupportedEncodingException {
int itemSizeBytes = DynamoDBUtil.getItemSizeBytes(item);
if (itemSizeBytes > maxItemByteSize) {
throw new RuntimeException("Cannot pass items with size greater than " + maxItemByteSize
+ ". Item with size of " + itemSizeBytes + " was given.");
}
maxItemsPerBatch = DynamoDBUtil.getBoundedBatchLimit(config, maxItemsPerBatch);
BatchWriteItemResult result = null;
if (writeBatchMap.containsKey(tableName)) {
boolean writeRequestsForTableAtLimit =
writeBatchMap.get(tableName).size() >= maxItemsPerBatch;
boolean totalSizeOfWriteBatchesOverLimit =
writeBatchMapSizeBytes + itemSizeBytes > maxBatchSize;
if (writeRequestsForTableAtLimit || totalSizeOfWriteBatchesOverLimit) {
result = writeBatch(reporter, itemSizeBytes);
}
}
// writeBatchMap could be cleared from writeBatch()
List<WriteRequest> writeBatchList;
if (!writeBatchMap.containsKey(tableName)) {
writeBatchList = new ArrayList<>((int) maxItemsPerBatch);
writeBatchMap.put(tableName, writeBatchList);
} else {
writeBatchList = writeBatchMap.get(tableName);
}
log.debug("BatchWriteItem deletionMode " + deletionMode);
if (deletionMode) {
writeBatchList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(item)));
} else {
writeBatchList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item)));
}
writeBatchMapSizeBytes += itemSizeBytes;
return result;
}
private static void writeMultipleItemsBatchWrite() {
try {
// Begin syntax extract
// Create a map for the requests in the batch
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
// Create a PutRequest for a new Forum item
Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
forumItem.put("Threads", new AttributeValue().withN("0"));
List<WriteRequest> forumList = new ArrayList<WriteRequest>();
forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
requestItems.put("Forum", forumList);
// Create a PutRequest for a new Thread item
Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
List<WriteRequest> threadList = new ArrayList<WriteRequest>();
threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
// Create a DeleteRequest for a Thread item
Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
threadDeleteKey.put("ForumName", new AttributeValue().withS("Some partition key value"));
threadDeleteKey.put("Subject", new AttributeValue().withS("Some sort key value"));
threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
requestItems.put("Thread", threadList);
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest();
System.out.println("Making the request.");
batchWriteItemRequest.withRequestItems(requestItems);
client.batchWriteItem(batchWriteItemRequest);
// End syntax extract
}
catch (AmazonServiceException ase) {
System.err.println("Failed to retrieve items: ");
ase.printStackTrace(System.err);
}
}
private static void writeMultipleItemsBatchWrite() {
try {
// Create a map for the requests in the batch
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
// Create a PutRequest for a new Forum item
Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
forumItem.put("Threads", new AttributeValue().withN("0"));
List<WriteRequest> forumList = new ArrayList<WriteRequest>();
forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
requestItems.put(table1Name, forumList);
// Create a PutRequest for a new Thread item
Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
threadItem.put("Message", new AttributeValue().withS("ElastiCache Thread 1 message"));
threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));
List<WriteRequest> threadList = new ArrayList<WriteRequest>();
threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
// Create a DeleteRequest for a Thread item
Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));
threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
requestItems.put(table2Name, threadList);
BatchWriteItemResult result;
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
do {
System.out.println("Making the request.");
batchWriteItemRequest.withRequestItems(requestItems);
result = client.batchWriteItem(batchWriteItemRequest);
// Print consumed capacity units
for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
String tableName = consumedCapacity.getTableName();
Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
}
// Check for unprocessed keys which could happen if you exceed
// provisioned throughput
System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
requestItems = result.getUnprocessedItems();
} while (result.getUnprocessedItems().size() > 0);
}
catch (AmazonServiceException ase) {
System.err.println("Failed to retrieve items: ");
ase.printStackTrace(System.err);
}
}
private static PutRequest generatePutRequest(String hashKeyData, String rangeKeyData) {
PutRequest putRequest = new PutRequest();
putRequest.addItemEntry(ATTR_NAME_1, new AttributeValue(hashKeyData));
putRequest.addItemEntry(ATTR_NAME_2, new AttributeValue().withN(rangeKeyData));
return putRequest;
}
private static void writeMultipleItemsBatchWrite() {
try {
// Begin syntax extract
// Create a map for the requests in the batch
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
// Create a PutRequest for a new Forum item
Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
forumItem.put("Threads", new AttributeValue().withN("0"));
List<WriteRequest> forumList = new ArrayList<WriteRequest>();
forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
requestItems.put("Forum", forumList);
// Create a PutRequest for a new Thread item
Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
List<WriteRequest> threadList = new ArrayList<WriteRequest>();
threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
// Create a DeleteRequest for a Thread item
Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
threadDeleteKey.put("ForumName", new AttributeValue().withS("Some hash attribute value"));
threadDeleteKey.put("Subject", new AttributeValue().withS("Some range attribute value"));
threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
requestItems.put("Thread", threadList);
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest();
System.out.println("Making the request.");
batchWriteItemRequest.withRequestItems(requestItems);
client.batchWriteItem(batchWriteItemRequest);
// End syntax extract
} catch (AmazonServiceException ase) {
System.err.println("Failed to retrieve items: ");
ase.printStackTrace(System.err);
}
}
private static void writeMultipleItemsBatchWrite() {
try {
// Create a map for the requests in the batch
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
// Create a PutRequest for a new Forum item
Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
forumItem.put("Threads", new AttributeValue().withN("0"));
List<WriteRequest> forumList = new ArrayList<WriteRequest>();
forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
requestItems.put(table1Name, forumList);
// Create a PutRequest for a new Thread item
Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message"));
threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));
List<WriteRequest> threadList = new ArrayList<WriteRequest>();
threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
// Create a DeleteRequest for a Thread item
Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));
threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
requestItems.put(table2Name, threadList);
BatchWriteItemResult result;
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
do {
System.out.println("Making the request.");
batchWriteItemRequest.withRequestItems(requestItems);
result = client.batchWriteItem(batchWriteItemRequest);
// Print consumed capacity units
for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
String tableName = consumedCapacity.getTableName();
Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
}
// Check for unprocessed keys which could happen if you exceed provisioned throughput
System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
requestItems = result.getUnprocessedItems();
} while (result.getUnprocessedItems().size() > 0);
} catch (AmazonServiceException ase) {
System.err.println("Failed to retrieve items: ");
ase.printStackTrace(System.err);
}
}
public PutRequest getPutRequest() {
return putRequest;
}