下面列出了怎么用com.amazonaws.services.dynamodbv2.model.WriteRequest的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Helper method to handle unprocessed items items
* @param session process session
* @param keysToFlowFileMap map of flow db primary key to flow file
* @param table dynamodb table
* @param hashKeyName the hash key name
* @param hashKeyValueType the hash key value
* @param rangeKeyName the range key name
* @param rangeKeyValueType range key value
* @param outcome the write outcome
*/
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
BatchWriteItemResult result = outcome.getBatchWriteItemResult();
// Handle unprocessed items
List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
for ( WriteRequest request : unprocessedItems) {
Map<String,AttributeValue> item = getRequestItem(request);
Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
}
@Before
public void setUp() {
outcome = new BatchWriteItemOutcome(result);
result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
return outcome;
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
}
@Before
public void setUp() {
outcome = new BatchWriteItemOutcome(result);
result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
return outcome;
}
};
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
}
@Test
public void testWriteDataToDynamo() {
final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems);
final PCollection<Void> output =
pipeline
.apply(Create.of(writeRequests))
.apply(
DynamoDBIO.<WriteRequest>write()
.withWriteRequestMapperFn(
(SerializableFunction<WriteRequest, KV<String, WriteRequest>>)
writeRequest -> KV.of(tableName, writeRequest))
.withRetryConfiguration(
DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1)))
.withAwsClientsProvider(
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
final PCollection<Long> publishedResultsSize = output.apply(Count.globally());
PAssert.that(publishedResultsSize).containsInAnyOrder(0L);
pipeline.run().waitUntilFinish();
}
private void commitPartial(List<WriteRequest> list) {
Timer t = new Timer();
Map<String, List<WriteRequest>> map = new HashMap<>();
map.put(getTenant().getName(), list);
BatchWriteItemResult result = m_client.batchWriteItem(new BatchWriteItemRequest(map));
int retry = 0;
while(result.getUnprocessedItems().size() > 0) {
if(retry == RETRY_SLEEPS.length) throw new RuntimeException("All retries failed");
m_logger.debug("Committing {} unprocessed items, retry: {}", result.getUnprocessedItems().size(), retry + 1);
try {
Thread.sleep(RETRY_SLEEPS[retry++]);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
result = m_client.batchWriteItem(new BatchWriteItemRequest(result.getUnprocessedItems()));
}
m_logger.debug("Committed {} writes in {}", list.size(), t);
list.clear();
}
public void deleteRowsFromDataIndex(
final byte[][] dataIds,
final short adapterId,
final String typeName) {
final String tableName =
typeName + "_" + getQualifiedTableName(DataIndexUtils.DATA_ID_INDEX.getName());
final Iterator<byte[]> dataIdIterator = Arrays.stream(dataIds).iterator();
while (dataIdIterator.hasNext()) {
final List<WriteRequest> deleteRequests = new ArrayList<>();
int i = 0;
while (dataIdIterator.hasNext() && (i < MAX_ROWS_FOR_BATCHWRITER)) {
deleteRequests.add(
new WriteRequest(
new DeleteRequest(
Collections.singletonMap(
DynamoDBRow.GW_PARTITION_ID_KEY,
new AttributeValue().withB(ByteBuffer.wrap(dataIdIterator.next()))))));
i++;
}
client.batchWriteItem(Collections.singletonMap(tableName, deleteRequests));
}
}
@Before
public void setUp() {
outcome = new BatchWriteItemOutcome(result);
result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
return outcome;
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
}
@Before
public void setUp() {
outcome = new BatchWriteItemOutcome(result);
result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
return outcome;
}
};
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
}
/**
* Helper method to handle unprocessed items items
* @param session process session
* @param keysToFlowFileMap map of flow db primary key to flow file
* @param table dynamodb table
* @param hashKeyName the hash key name
* @param hashKeyValueType the hash key value
* @param rangeKeyName the range key name
* @param rangeKeyValueType range key value
* @param outcome the write outcome
*/
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
BatchWriteItemResult result = outcome.getBatchWriteItemResult();
// Handle unprocessed items
List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
for ( WriteRequest request : unprocessedItems) {
Map<String,AttributeValue> item = getRequestItem(request);
Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
}
@Test
public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
PutRequest put = new PutRequest();
put.addItemEntry("hashS", new AttributeValue("h1"));
put.addItemEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(put);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2");
putRunner.enqueue("{\"hello\":\"world\"}".getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
}
@Test
public void testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
DeleteRequest delete = new DeleteRequest();
delete.addKeyEntry("hashS", new AttributeValue("h1"));
delete.addKeyEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(delete);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
}
@Override
public void write(K key, V value) throws IOException {
if (value == null) {
throw new RuntimeException("Null record encountered. At least the key columns must be "
+ "specified.");
}
verifyInterval();
if (progressable != null) {
progressable.progress();
}
DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value);
BatchWriteItemResult result = client.putBatch(tableName, item.getItem(),
permissibleWritesPerSecond - writesPerSecond, reporter, deletionMode);
batchSize++;
totalItemsWritten++;
if (result != null) {
if (result.getConsumedCapacity() != null) {
for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
double consumedUnits = consumedCapacity.getCapacityUnits();
totalIOPSConsumed += consumedUnits;
}
}
int unprocessedItems = 0;
for (List<WriteRequest> requests : result.getUnprocessedItems().values()) {
unprocessedItems += requests.size();
}
writesPerSecond += batchSize - unprocessedItems;
batchSize = unprocessedItems;
}
}
@Test
public void testPutBatchDeletionModeSuccessful() throws Exception {
Map<String, AttributeValue> item = ImmutableMap.of("",
new AttributeValue(Strings.repeat("a", (int) DEFAULT_MAX_ITEM_SIZE)));
client.putBatch("dummyTable", item, 1, null, true);
for (Map.Entry<String, List<WriteRequest>> entry: client.getWriteBatchMap().entrySet()) {
for (WriteRequest req: entry.getValue()) {
Assert.assertNotNull(req.getDeleteRequest());
Assert.assertNull(req.getPutRequest());
}
}
}
private Map<String, List<WriteRequest>> toWritesByTable(Iterator<SinkRecord> recordIterator) {
final Map<String, List<WriteRequest>> writesByTable = new HashMap<>();
for (int count = 0; recordIterator.hasNext() && count < config.batchSize; count++) {
final SinkRecord record = recordIterator.next();
final WriteRequest writeRequest = new WriteRequest(toPutRequest(record));
writesByTable.computeIfAbsent(tableName(record), k -> new ArrayList<>(config.batchSize)).add(writeRequest);
}
return writesByTable;
}
private static String makeMessage(Map<String, List<WriteRequest>> unprocessedItems) {
final StringBuilder msg = new StringBuilder("Unprocessed writes: {");
for (Map.Entry<String, List<WriteRequest>> e : unprocessedItems.entrySet()) {
msg.append(" ").append(e.getKey()).append("(").append(e.getValue().size()).append(")").append(" ");
}
msg.append("}");
return msg.toString();
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
final KV<String, WriteRequest> writeRequest =
(KV<String, WriteRequest>) spec.getWriteItemMapperFn().apply(context.element());
batch.add(writeRequest);
if (batch.size() >= BATCH_SIZE) {
flushBatch();
}
}
static List<WriteRequest> generateWriteRequests(int numOfItem) {
List<WriteRequest> writeRequests = new ArrayList<>();
for (int i = 1; i <= numOfItem; i++) {
WriteRequest writeRequest = new WriteRequest();
writeRequest.setPutRequest(generatePutRequest("hashKeyDataStr_" + i, "1000" + i));
writeRequests.add(writeRequest);
}
return writeRequests;
}
@Test
public void testRetries() throws Throwable {
thrown.expectMessage("Error writing to DynamoDB");
final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems);
AmazonDynamoDB amazonDynamoDBMock = Mockito.mock(AmazonDynamoDB.class);
Mockito.when(amazonDynamoDBMock.batchWriteItem(Mockito.any(BatchWriteItemRequest.class)))
.thenThrow(new AmazonDynamoDBException("Service unavailable"));
pipeline
.apply(Create.of(writeRequests))
.apply(
DynamoDBIO.<WriteRequest>write()
.withWriteRequestMapperFn(
(SerializableFunction<WriteRequest, KV<String, WriteRequest>>)
writeRequest -> KV.of(tableName, writeRequest))
.withRetryConfiguration(
DynamoDBIO.RetryConfiguration.create(4, Duration.standardSeconds(10)))
.withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDBMock)));
try {
pipeline.run().waitUntilFinish();
} catch (final Pipeline.PipelineExecutionException e) {
// check 3 retries were initiated by inspecting the log before passing on the exception
expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 1));
expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 2));
expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 3));
throw e.getCause();
}
fail("Pipeline is expected to fail because we were unable to write to DynamoDB.");
}
/**
* Writes to DynamoDBTable using an exponential backoff. If the
* batchWriteItem returns unprocessed items then it will exponentially
* backoff and retry the unprocessed items.
*/
public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) {
BatchWriteItemResult writeItemResult = null;
List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>();
Map<String, List<WriteRequest>> unprocessedItems = null;
boolean interrupted = false;
try {
do {
writeItemResult = client.batchWriteItem(req);
unprocessedItems = writeItemResult.getUnprocessedItems();
consumedCapacities
.addAll(writeItemResult.getConsumedCapacity());
if (unprocessedItems != null) {
req.setRequestItems(unprocessedItems);
try {
Thread.sleep(exponentialBackoffTime);
} catch (InterruptedException ie) {
interrupted = true;
} finally {
exponentialBackoffTime *= 2;
if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) {
exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME;
}
}
}
} while (unprocessedItems != null && unprocessedItems.get(tableName) != null);
return consumedCapacities;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Splits up a ScanResult into a list of BatchWriteItemRequests of size 25
* items or less each.
*/
public static List<BatchWriteItemRequest> splitResultIntoBatches(
ScanResult result, String tableName) {
List<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>();
Iterator<Map<String, AttributeValue>> it = result.getItems().iterator();
BatchWriteItemRequest req = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
List<WriteRequest> writeRequests = new LinkedList<WriteRequest>();
int i = 0;
while (it.hasNext()) {
PutRequest put = new PutRequest(it.next());
writeRequests.add(new WriteRequest(put));
i++;
if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) {
req.addRequestItemsEntry(tableName, writeRequests);
batches.add(req);
req = new BatchWriteItemRequest()
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
writeRequests = new LinkedList<WriteRequest>();
i = 0;
}
}
if (i > 0) {
req.addRequestItemsEntry(tableName, writeRequests);
batches.add(req);
}
return batches;
}
@Test
public void test_batchWriteItem_WithAllParameters() throws Exception {
createTable();
String TEST_ATTRIBUTE_2 = "Attribute2";
String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2";
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>();
item1.put(TEST_ATTRIBUTE, new AttributeValue()
.withS(TEST_ATTRIBUTE_VALUE));
WriteRequest writeRequest1 = new WriteRequest()
.withPutRequest(new PutRequest()
.withItem(item1));
writeRequests.add(writeRequest1);
Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>();
item2.put(TEST_ATTRIBUTE_2, new AttributeValue()
.withS(TEST_ATTRIBUTE_VALUE_2));
WriteRequest writeRequest2 = new WriteRequest()
.withPutRequest(new PutRequest()
.withItem(item2));
writeRequests.add(writeRequest2);
requestItems.put(TEST_TABLE_NAME, writeRequests);
BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems);
List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity();
assertThat(consumedCapacities.size(), equalTo(writeRequests.size()));
}
@Override
public void write(final GeoWaveRow[] rows) {
final List<WriteRequest> mutations = new ArrayList<>();
for (final GeoWaveRow row : rows) {
mutations.addAll(rowToMutations(row, isDataIndex));
}
write(mutations);
}
public void write(final WriteRequest item) {
synchronized (batchedItems) {
batchedItems.add(item);
if (batchedItems.size() >= NUM_ITEMS) {
do {
writeBatch(ASYNC_WRITE);
} while (batchedItems.size() >= NUM_ITEMS);
}
}
}
private void retry(final Map<String, List<WriteRequest>> map) {
for (final Entry<String, List<WriteRequest>> requests : map.entrySet()) {
for (final WriteRequest r : requests.getValue()) {
if (r.getPutRequest() != null) {
client.putItem(requests.getKey(), r.getPutRequest().getItem());
}
}
}
}
private void retryAsync(final Map<String, List<WriteRequest>> map) {
for (final Entry<String, List<WriteRequest>> requests : map.entrySet()) {
for (final WriteRequest r : requests.getValue()) {
if (r.getPutRequest() != null) {
/**
* The code is pretty similar to retry. The only difference is retryAsync uses
* putItemAsync instead of putItem
*/
final PutItemRequest putRequest =
new PutItemRequest(requests.getKey(), r.getPutRequest().getItem());
final Future<PutItemResult> future =
client.putItemAsync(putRequest, new AsyncHandler<PutItemRequest, PutItemResult>() {
@Override
public void onError(final Exception exception) {
LOGGER.warn("Putitem Async failed in Dynamo");
futureMap.remove(putRequest);
}
@Override
public void onSuccess(final PutItemRequest request, final PutItemResult result) {
if (futureMap.remove(request) == null) {
LOGGER.warn("Unable to delete PutItemRequest from futuresMap ");
}
return;
}
});
futureMap.put(putRequest, future);
}
}
}
}
@Test
public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
PutRequest put = new PutRequest();
put.addItemEntry("hashS", new AttributeValue("h1"));
put.addItemEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(put);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2");
putRunner.enqueue("{\"hello\":\"world\"}".getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
}
@Test
public void testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
DeleteRequest delete = new DeleteRequest();
delete.addKeyEntry("hashS", new AttributeValue("h1"));
delete.addKeyEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(delete);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
}
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) {
BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest();
List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
for (PutPointRequest putPointRequest : putPointRequests) {
long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint());
long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength());
String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint());
PutRequest putRequest = putPointRequest.getPutRequest();
AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey));
putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue);
putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue());
AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash));
putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue);
AttributeValue geoJsonValue = new AttributeValue().withS(geoJson);
putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue);
WriteRequest writeRequest = new WriteRequest(putRequest);
writeRequests.add(writeRequest);
}
Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
requestItems.put(config.getTableName(), writeRequests);
batchItemRequest.setRequestItems(requestItems);
BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest);
BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult);
return batchWritePointResult;
}
@Override
public List<Map<String, AttributeValue>> emit(final UnmodifiableBuffer<Map<String, AttributeValue>> buffer)
throws IOException {
// Map of WriteRequests to records for reference
Map<WriteRequest, Map<String, AttributeValue>> requestMap =
new HashMap<WriteRequest, Map<String, AttributeValue>>();
List<Map<String, AttributeValue>> unproc = new ArrayList<Map<String, AttributeValue>>();
// Build a batch request with a record list
List<WriteRequest> rList = new ArrayList<WriteRequest>();
List<Map<String, AttributeValue>> resultList;
// Amazon DynamoDB only allows one operation per item in a bulk insertion (no duplicate items)
Set<Map<String, AttributeValue>> uniqueItems = uniqueItems(buffer.getRecords());
for (Map<String, AttributeValue> item : uniqueItems) {
WriteRequest wr = new WriteRequest().withPutRequest(new PutRequest().withItem(item));
// add to the map
requestMap.put(wr, item);
// add to the list of requests
rList.add(wr);
// Max of sixteen not to exceed maximum request size
if (rList.size() == 16) {
resultList = performBatchRequest(rList, requestMap);
unproc.addAll(resultList);
rList.clear();
}
}
resultList = performBatchRequest(rList, requestMap);
unproc.addAll(resultList);
LOG.info("Successfully emitted " + (buffer.getRecords().size() - unproc.size()) + " records into DynamoDB.");
return unproc;
}
private List<Map<String, AttributeValue>> unproccessedItems(BatchWriteItemResult result,
Map<WriteRequest, Map<String, AttributeValue>> requestMap) {
Collection<List<WriteRequest>> items = result.getUnprocessedItems().values();
List<Map<String, AttributeValue>> unprocessed = new ArrayList<Map<String, AttributeValue>>();
// retrieve the unprocessed items
for (List<WriteRequest> list : items) {
for (WriteRequest request : list) {
unprocessed.add(requestMap.get(request));
}
}
return unprocessed;
}