下面列出了怎么用com.amazonaws.services.dynamodbv2.model.Record的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Creates a DynamoDB write request based on the DynamoDB Stream record.
*
* @param record
* The DynamoDB Stream record containing information about the update to a DynamoDB table
* @return A DynamoDB request based on the DynamoDB Stream record
*/
private AmazonWebServiceRequest createRequest(final Record record) {
final String eventName = record.getEventName();
final AmazonWebServiceRequest request;
if (eventName.equalsIgnoreCase(OperationType.INSERT.toString()) || eventName.equalsIgnoreCase(OperationType.MODIFY.toString())) {
// For INSERT or MODIFY: Put the new image in the DynamoDB table
PutItemRequest putItemRequest = new PutItemRequest();
putItemRequest.setItem(record.getDynamodb().getNewImage());
putItemRequest.setTableName(getTableName());
request = putItemRequest;
} else if (eventName.equalsIgnoreCase(OperationType.REMOVE.toString())) {
// For REMOVE: Delete the item from the DynamoDB table
DeleteItemRequest deleteItemRequest = new DeleteItemRequest();
deleteItemRequest.setKey(record.getDynamodb().getKeys());
deleteItemRequest.setTableName(getTableName());
request = deleteItemRequest;
} else {
// This should only happen if DynamoDB Streams adds/changes its operation types
log.warn("Unsupported operation type detected: " + eventName + ". Record: " + record);
request = null;
}
if (null != request) {
request.getRequestClientOptions().appendUserAgent(USER_AGENT);
}
return request;
}
/**
* {@inheritDoc}
*/
@Override
public void fail(final List<Record> records) {
if (isShutdown) {
if (records.isEmpty()) {
// This is OK (but not expected)
log.warn("Emitter fail method called after shutdown method was called. Continuing because list is empty");
return;
} else {
throw new IllegalStateException("Emitter fail method called after shutdown method was called.");
}
}
for (Record record : records) {
log.error("Could not emit record: " + record);
}
final AmazonCloudWatchAsync cloudwatch = CLOUDWATCH.get();
if (null != cloudwatch) {
final double failed = records.size();
final MetricDatum recordsProcessedFailedDatum = new MetricDatum().withMetricName(RECORDS_FAILED).withValue(failed).withUnit(StandardUnit.Count)
.withTimestamp(new Date());
final PutMetricDataRequest request = new PutMetricDataRequest().withNamespace(applicationName).withMetricData(recordsProcessedFailedDatum);
cloudwatch.putMetricDataAsync(request);
}
}
@SuppressWarnings("deprecation")
@Test
public void insertTest() throws Exception {
// Set up the buffer and do sanity checks
buffer.clear();
buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT.getDynamodb().getSequenceNumber());
assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(), buffer.getFirstSequenceNumber());
assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(), buffer.getLastSequenceNumber());
List<Record> buffered = buffer.getRecords();
assertEquals(1, buffered.size());
assertTrue(buffered.contains(ITEM1_INSERT));
// Emit record
resetAll(DYNAMODB);
expectNew(AmazonDynamoDBAsyncClient.class, new Class<?>[] {AWSCredentialsProvider.class, ClientConfiguration.class, ExecutorService.class}, anyObject(AWSCredentialsProvider.class), anyObject(ClientConfiguration.class), anyObject(ExecutorService.class)).andReturn(DYNAMODB);
DYNAMODB.putItemAsync(EasyMock.anyObject(PutItemRequest.class), anyObject(AsyncHandler.class));
expectLastCall().andAnswer(SUCCESS_ANSWER);
DYNAMODB.setEndpoint(EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
replayAll(DYNAMODB);
IEmitter<Record> instance = createEmitterInstance();
assertTrue(instance.emit(new UnmodifiableBuffer<Record>(buffer)).isEmpty());
verifyAll();
}
@SuppressWarnings("deprecation")
@Test
public void modifyTest() throws Exception {
// Set up the buffer and do sanity checks
buffer.clear();
buffer.consumeRecord(ITEM1_MODIFY, ITEM1_MODIFY.getDynamodb().getSizeBytes().intValue(), ITEM1_MODIFY.getDynamodb().getSequenceNumber());
assertEquals(ITEM1_MODIFY.getDynamodb().getSequenceNumber(), buffer.getFirstSequenceNumber());
assertEquals(ITEM1_MODIFY.getDynamodb().getSequenceNumber(), buffer.getLastSequenceNumber());
List<Record> buffered = buffer.getRecords();
assertEquals(1, buffered.size());
assertTrue(buffered.contains(ITEM1_MODIFY));
// Emit record
resetAll(DYNAMODB);
DYNAMODB.putItemAsync(EasyMock.anyObject(PutItemRequest.class), anyObject(AsyncHandler.class));
expectLastCall().andAnswer(SUCCESS_ANSWER);
expectNew(AmazonDynamoDBAsyncClient.class, new Class<?>[] {AWSCredentialsProvider.class, ClientConfiguration.class, ExecutorService.class}, anyObject(AWSCredentialsProvider.class), anyObject(ClientConfiguration.class), anyObject(ExecutorService.class)).andReturn(DYNAMODB);
DYNAMODB.setEndpoint(EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
replayAll(DYNAMODB);
IEmitter<Record> instance = createEmitterInstance();
assertTrue(instance.emit(new UnmodifiableBuffer<Record>(buffer)).isEmpty());
verifyAll();
}
@SuppressWarnings("deprecation")
@Test
public void removeTest() throws Exception {
// Set up the buffer and do sanity checks
buffer.clear();
buffer.consumeRecord(ITEM1_REMOVE, ITEM1_REMOVE.getDynamodb().getSizeBytes().intValue(), ITEM1_REMOVE.getDynamodb().getSequenceNumber());
assertEquals(ITEM1_REMOVE.getDynamodb().getSequenceNumber(), buffer.getFirstSequenceNumber());
assertEquals(ITEM1_REMOVE.getDynamodb().getSequenceNumber(), buffer.getLastSequenceNumber());
List<Record> buffered = buffer.getRecords();
assertEquals(1, buffered.size());
assertTrue(buffered.contains(ITEM1_REMOVE));
// Emit record
resetAll(DYNAMODB);
DYNAMODB.deleteItemAsync(anyObject(DeleteItemRequest.class), anyObject(AsyncHandler.class));
expectLastCall().andAnswer(SUCCESS_ANSWER);
expectNew(AmazonDynamoDBAsyncClient.class, new Class<?>[] {AWSCredentialsProvider.class, ClientConfiguration.class, ExecutorService.class}, anyObject(AWSCredentialsProvider.class), anyObject(ClientConfiguration.class), anyObject(ExecutorService.class)).andReturn(DYNAMODB);
DYNAMODB.setEndpoint(EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
replayAll(DYNAMODB);
IEmitter<Record> instance = createEmitterInstance();
assertTrue(instance.emit(new UnmodifiableBuffer<Record>(buffer)).isEmpty());
verifyAll();
}
@SuppressWarnings("deprecation")
@Test
public void multipleRecordsEmitsTest() throws Exception {
// Set up the buffer and do sanity checks
buffer.clear();
buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT.getDynamodb().getSequenceNumber());
buffer.consumeRecord(ITEM2_INSERT, ITEM2_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM2_INSERT.getDynamodb().getSequenceNumber());
assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(), buffer.getFirstSequenceNumber());
assertEquals(ITEM2_INSERT.getDynamodb().getSequenceNumber(), buffer.getLastSequenceNumber());
List<Record> buffered = buffer.getRecords();
assertEquals(2, buffered.size());
assertTrue(buffered.contains(ITEM1_INSERT));
assertTrue(buffered.contains(ITEM2_INSERT));
// Emit record
resetAll(DYNAMODB);
DYNAMODB.putItemAsync(EasyMock.anyObject(PutItemRequest.class), anyObject(AsyncHandler.class));
expectLastCall().andAnswer(SUCCESS_ANSWER).times(2);
expectNew(AmazonDynamoDBAsyncClient.class, new Class<?>[] {AWSCredentialsProvider.class, ClientConfiguration.class, ExecutorService.class}, anyObject(AWSCredentialsProvider.class), anyObject(ClientConfiguration.class), anyObject(ExecutorService.class)).andReturn(DYNAMODB);
DYNAMODB.setEndpoint(EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
replayAll(DYNAMODB);
IEmitter<Record> instance = createEmitterInstance();
assertTrue(instance.emit(new UnmodifiableBuffer<Record>(buffer)).isEmpty());
verifyAll();
}
@Test
public void multipleRecordsTest() {
DynamoDBBuffer buffer = new DynamoDBBuffer(new DynamoDBStreamsConnectorConfiguration(new Properties(), null));
buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT
.getDynamodb().getSequenceNumber());
buffer.consumeRecord(ITEM2_INSERT, ITEM2_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM2_INSERT
.getDynamodb().getSequenceNumber());
assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getFirstSequenceNumber());
assertEquals(ITEM2_INSERT.getDynamodb().getSequenceNumber(),buffer.getLastSequenceNumber());
List<Record> buffered = buffer.getRecords();
assertEquals(2, buffered.size());
assertTrue(buffered.contains(ITEM1_INSERT));
assertTrue(buffered.contains(ITEM2_INSERT));
assertTrue(buffer.shouldFlush());
}
@Test
public void dedupSanityTest() {
DynamoDBBuffer buffer = new DynamoDBBuffer(new DynamoDBStreamsConnectorConfiguration(new Properties(), null));
buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT
.getDynamodb().getSequenceNumber());
buffer.consumeRecord(ITEM1_MODIFY, ITEM1_MODIFY.getDynamodb().getSizeBytes().intValue(), ITEM1_MODIFY
.getDynamodb().getSequenceNumber());
assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getFirstSequenceNumber());
assertEquals(ITEM1_MODIFY.getDynamodb().getSequenceNumber(),buffer.getLastSequenceNumber());
List<Record> buffered = buffer.getRecords();
assertEquals(1, buffered.size());
assertTrue(buffered.contains(ITEM1_MODIFY));
assertFalse(buffered.contains(ITEM1_INSERT));
assertTrue(buffer.shouldFlush());
}
@Test
public void dedupMultipleRecordsTest() {
DynamoDBBuffer buffer = new DynamoDBBuffer(new DynamoDBStreamsConnectorConfiguration(new Properties(), null));
buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT
.getDynamodb().getSequenceNumber());
buffer.consumeRecord(ITEM1_MODIFY, ITEM1_MODIFY.getDynamodb().getSizeBytes().intValue(), ITEM1_MODIFY
.getDynamodb().getSequenceNumber());
buffer.consumeRecord(ITEM2_INSERT, ITEM2_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM2_INSERT
.getDynamodb().getSequenceNumber());
buffer.consumeRecord(ITEM2_MODIFY, ITEM2_MODIFY.getDynamodb().getSizeBytes().intValue(), ITEM2_MODIFY
.getDynamodb().getSequenceNumber());
assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getFirstSequenceNumber());
assertEquals(ITEM2_MODIFY.getDynamodb().getSequenceNumber(),buffer.getLastSequenceNumber());
List<Record> buffered = buffer.getRecords();
assertEquals(2, buffered.size());
assertTrue(buffered.contains(ITEM1_MODIFY));
assertFalse(buffered.contains(ITEM1_INSERT));
assertTrue(buffered.contains(ITEM2_MODIFY));
assertFalse(buffered.contains(ITEM2_INSERT));
assertTrue(buffer.shouldFlush());
}
@Override
public IEmitter<Record> getEmitter(final KinesisConnectorConfiguration configuration) {
if (configuration instanceof DynamoDBStreamsConnectorConfiguration) {
return new DynamoDBReplicationEmitter((DynamoDBStreamsConnectorConfiguration) configuration);
} else {
throw new IllegalArgumentException(this + " needs a DynamoDBStreamsConnectorConfiguration argument.");
}
}
@Override
public IBuffer<Record> getBuffer(final KinesisConnectorConfiguration configuration) {
if (configuration instanceof DynamoDBStreamsConnectorConfiguration) {
return new DynamoDBBuffer((DynamoDBStreamsConnectorConfiguration) configuration);
} else {
throw new IllegalArgumentException(this + " needs a DynamoDBStreamsConnectorConfiguration argument.");
}
}
/**
* {@inheritDoc}
*/
@Override
public Record toClass(final com.amazonaws.services.kinesis.model.Record record) throws IOException {
if (record instanceof RecordAdapter) {
return ((RecordAdapter) record).getInternalObject();
} else {
return MAPPER.readValue(new String(record.getData().array(), ENCODING), Record.class);
}
}
/**
* {@inheritDoc}
*/
@Override
public void consumeRecord(final Record record, final int recordBytes, final String sequenceNumber) {
// Use HashMap to deduplicate using the DynamoDB key as the key.
getBuffer().put(record.getDynamodb().getKeys(), record);
// Sequence number bound maintenance
setLastSequenceNumber(sequenceNumber);
if (getFirstSequenceNumber() == null) {
setFirstSequenceNumber(getLastSequenceNumber());
}
setProcessedRecords(getProcessedRecords() + 1);
emitCloudWatchMetrics();
}
@Test
public void sanityTest() {
DynamoDBBuffer buffer = new DynamoDBBuffer(new DynamoDBStreamsConnectorConfiguration(new Properties(), null));
buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT
.getDynamodb().getSequenceNumber());
assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getFirstSequenceNumber());
assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getLastSequenceNumber());
List<Record> buffered = buffer.getRecords();
assertEquals(1, buffered.size());
assertTrue(buffered.contains(ITEM1_INSERT));
assertTrue(buffer.shouldFlush());
}
@Override
protected IEmitter<Record> createEmitterInstance() {
Properties properties = new Properties();
properties.setProperty("APP_NAME", "TEST");
properties.setProperty("DYNAMODB_ENDPOINT", "ENDPOINT");
properties.setProperty("REGION_NAME", "REGION");
properties.setProperty("DYNAMODB_DATA_TABLE_NAME", "TABLE");
AWSStaticCredentialsProvider credentialProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("Access", "Secret"));
return new DynamoDBReplicationEmitter(new DynamoDBStreamsConnectorConfiguration(properties, credentialProvider),
new AmazonDynamoDBAsyncClient(credentialProvider, new ClientConfiguration().withMaxErrorRetry(0), Executors.newFixedThreadPool(MAX_THREADS)), null);
}
@Override
public Record deserialize(byte[] message, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException {
return MAPPER.readValue(message, Record.class);
}
@Override
public TypeInformation<Record> getProducedType() {
return TypeInformation.of(Record.class);
}
@Override
public Record deserialize(byte[] message, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException {
return MAPPER.readValue(message, Record.class);
}
@Override
public TypeInformation<Record> getProducedType() {
return TypeInformation.of(Record.class);
}
protected IRecordProcessor createStreamProcessor() {
return new IRecordProcessor() {
@Override
public void initialize(InitializationInput initializationInput) {
}
public List<Record> extractDynamoStreamRecords(List<com.amazonaws.services.kinesis.model.Record> kinesisRecords) {
List<Record> dynamoRecords = new ArrayList<>(kinesisRecords.size());
for(com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) {
if (kinesisRecord instanceof RecordAdapter) {
Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject();
dynamoRecords.add(dynamoRecord);
}
}
return dynamoRecords;
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = extractDynamoStreamRecords(processRecordsInput.getRecords());
DynamoDBTableReplicator.this.processRecords(records);
checkpoint(processRecordsInput.getCheckpointer());
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
checkpoint(shutdownInput.getCheckpointer());
}
}
void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibDependencyException|InvalidStateException|ThrottlingException|ShutdownException e) {
LOG.warn(e);
}
}
};
}
public Worker createWorker() {
// use default credential provider chain to locate appropriate credentials
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
// initialize DynamoDB client and set the endpoint properly for source table / region
final AmazonDynamoDB dynamodbClient = AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentialsProvider)
.withEndpointConfiguration(createEndpointConfiguration(sourceRegion, sourceDynamodbEndpoint, AmazonDynamoDB.ENDPOINT_PREFIX))
.build();
// initialize Streams client
final AwsClientBuilder.EndpointConfiguration streamsEndpointConfiguration = createEndpointConfiguration(sourceRegion,
sourceDynamodbStreamsEndpoint, AmazonDynamoDBStreams.ENDPOINT_PREFIX);
final ClientConfiguration streamsClientConfig = new ClientConfiguration().withGzip(false);
final AmazonDynamoDBStreams streamsClient = AmazonDynamoDBStreamsClientBuilder.standard()
.withCredentials(credentialsProvider)
.withEndpointConfiguration(streamsEndpointConfiguration)
.withClientConfiguration(streamsClientConfig)
.build();
// obtain the Stream ID associated with the source table
final String streamArn = dynamodbClient.describeTable(sourceTable).getTable().getLatestStreamArn();
final boolean streamEnabled = DynamoDBConnectorUtilities.isStreamsEnabled(streamsClient, streamArn, DynamoDBConnectorConstants.NEW_AND_OLD);
Preconditions.checkArgument(streamArn != null, DynamoDBConnectorConstants.MSG_NO_STREAMS_FOUND);
Preconditions.checkState(streamEnabled, DynamoDBConnectorConstants.STREAM_NOT_READY);
// initialize DynamoDB client for KCL
final AmazonDynamoDB kclDynamoDBClient = AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentialsProvider)
.withEndpointConfiguration(createKclDynamoDbEndpointConfiguration())
.build();
// initialize DynamoDB Streams Adapter client and set the Streams endpoint properly
final AmazonDynamoDBStreamsAdapterClient streamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsClient);
// initialize CloudWatch client and set the region to emit metrics to
final AmazonCloudWatch kclCloudWatchClient;
if (isPublishCloudWatch) {
kclCloudWatchClient = AmazonCloudWatchClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(kclRegion.or(sourceRegion).getName()).build();
} else {
kclCloudWatchClient = new NoopCloudWatch();
}
// try to get taskname from command line arguments, auto generate one if needed
final AwsClientBuilder.EndpointConfiguration destinationEndpointConfiguration = createEndpointConfiguration(destinationRegion,
destinationDynamodbEndpoint, AmazonDynamoDB.ENDPOINT_PREFIX);
final String actualTaskName = DynamoDBConnectorUtilities.getTaskName(sourceRegion, destinationRegion, taskName, sourceTable, destinationTable);
// set the appropriate Connector properties for the destination KCL configuration
final Properties properties = new Properties();
properties.put(DynamoDBStreamsConnectorConfiguration.PROP_APP_NAME, actualTaskName);
properties.put(DynamoDBStreamsConnectorConfiguration.PROP_DYNAMODB_ENDPOINT, destinationEndpointConfiguration.getServiceEndpoint());
properties.put(DynamoDBStreamsConnectorConfiguration.PROP_DYNAMODB_DATA_TABLE_NAME, destinationTable);
properties.put(DynamoDBStreamsConnectorConfiguration.PROP_REGION_NAME, destinationRegion.getName());
// create the record processor factory based on given pipeline and connector configurations
// use the master to replicas pipeline
final KinesisConnectorRecordProcessorFactory<Record, Record> factory = new KinesisConnectorRecordProcessorFactory<>(
new DynamoDBMasterToReplicasPipeline(), new DynamoDBStreamsConnectorConfiguration(properties, credentialsProvider));
// create the KCL configuration with default values
final KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(actualTaskName,
streamArn,
credentialsProvider,
DynamoDBConnectorConstants.WORKER_LABEL + actualTaskName + UUID.randomUUID().toString())
// worker will use checkpoint table if available, otherwise it is safer
// to start at beginning of the stream
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
// we want the maximum batch size to avoid network transfer latency overhead
.withMaxRecords(getRecordsLimit.or(DynamoDBConnectorConstants.STREAMS_RECORDS_LIMIT))
// wait a reasonable amount of time - default 0.5 seconds
.withIdleTimeBetweenReadsInMillis(DynamoDBConnectorConstants.IDLE_TIME_BETWEEN_READS)
// Remove calls to GetShardIterator
.withValidateSequenceNumberBeforeCheckpointing(false)
// make parent shard poll interval tunable to decrease time to run integration test
.withParentShardPollIntervalMillis(parentShardPollIntervalMillis.or(DynamoDBConnectorConstants.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS))
// avoid losing leases too often - default 60 seconds
.withFailoverTimeMillis(DynamoDBConnectorConstants.KCL_FAILOVER_TIME);
// create the KCL worker for this connector
return new Worker(factory, kclConfig, streamsAdapterClient, kclDynamoDBClient, kclCloudWatchClient);
}
@Override
public ITransformer<Record, Record> getTransformer(final KinesisConnectorConfiguration configuration) {
return new DynamoDBStreamsRecordTransformer();
}
@Override
public IFilter<Record> getFilter(final KinesisConnectorConfiguration configuration) {
return new AllPassFilter<Record>();
}
/**
* {@inheritDoc}
*/
@Override
public Record fromClass(final Record record) throws IOException {
// since the emitter expects DynamoDB stream records, do nothing here
return record;
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public List<Record> emit(final UnmodifiableBuffer<Record> buffer) {
if (isShutdown) {
if (buffer.getRecords().isEmpty()) {
// This is OK, but not expected
log.warn("Record processor called emit after calling shutdown. Continuing becuase buffer is empty.");
return Collections.emptyList();
} else {
throw new IllegalStateException("Cannot emit records after emitter has been shutdown.");
}
}
// Asynchronously process all writes, but block on the results.
List<Record> records = buffer.getRecords();
// Stores records that failed with a non-retryable exception
final List<Record> failedRecords = Collections.synchronizedList(new ArrayList<Record>());
// Queue of records to submit
final BlockingQueue<Record> toSubmit = new LinkedBlockingQueue<Record>(records);
// Used to detect when all requests have either succeeded or resulted in a non-retryable exception
final CountDownLatch doneSignal = new CountDownLatch(records.size());
final AtomicInteger retryCount = new AtomicInteger();
boolean interrupted = false;
try {
while (doneSignal.getCount() > 0) {
Record recordToSubmit = null;
try {
recordToSubmit = toSubmit.poll(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted = true;
}
final Record record = recordToSubmit;
if (null == record) {
continue; // Check if all records have completed and if not try to poll again
}
// Generate the request based on the record
AmazonWebServiceRequest request = createRequest(record);
if (request == null) { // Should only happen if DynamoDB Streams API updates to support different operations
// than {INSERT, MODIFY, REMOVE}.
continue;
}
// Submit the write request based on its type
if (request instanceof PutItemRequest) { // PUT
getDynamodb().putItemAsync((PutItemRequest) request,
(AsyncHandler<PutItemRequest, PutItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
} else if (request instanceof DeleteItemRequest) { // DELETE
getDynamodb().deleteItemAsync((DeleteItemRequest) request,
(AsyncHandler<DeleteItemRequest, DeleteItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
} else if (request instanceof UpdateItemRequest) { // UPDATE
getDynamodb().updateItemAsync((UpdateItemRequest) request,
(AsyncHandler<UpdateItemRequest, UpdateItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
} else { // Should only happen if DynamoDB allows a new operation other than {PutItem, DeleteItem,
// UpdateItem} for single item writes.
log.warn("Unsupported DynamoDB request: " + request);
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
emitCloudWatchMetrics(records, failedRecords, retryCount);
if (!records.isEmpty()) {
log.debug("Successfully emitted " + (records.size() - failedRecords.size()) + " records ending with sequence number "
+ buffer.getLastSequenceNumber());
} else {
log.debug("No records to emit");
}
return failedRecords;
}
/**
* {@inheritDoc}
*/
@Override
public List<Record> getRecords() {
// Convert records to a list
return new ArrayList<Record>(getBuffer().values());
}
/**
* @return the buffer
*/
public Map<Map<String, AttributeValue>, Record> getBuffer() {
return buffer;
}
@Override
protected IEmitter<Record> createEmitterInstance() {
return new DynamoDBReplicationEmitter("TEST", "ENDPOINT", "REGION", "TABLE", null, new AWSStaticCredentialsProvider(new BasicAWSCredentials("Access", "Secret")));
}
@Override
public Record deserialize(byte[] message, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException {
return MAPPER.readValue(message, Record.class);
}
@Override
public TypeInformation<Record> getProducedType() {
return TypeInformation.of(Record.class);
}