下面列出了怎么用com.amazonaws.services.dynamodbv2.model.StreamRecord的API类实例代码及写法,或者点击链接到github查看源代码。
private SourceRecord toSourceRecord(Map<String, String> sourcePartition, String topic, StreamRecord dynamoRecord) {
return new SourceRecord(
sourcePartition,
Collections.singletonMap(Keys.SEQNUM, dynamoRecord.getSequenceNumber()),
topic, null,
RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getKeys()),
RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getNewImage()),
dynamoRecord.getApproximateCreationDateTime().getTime()
);
}
@Override
public Object handleRequest(DynamodbEvent input, Context context) {
String employeeId = "";
String employeeName = "";
String expenseType = "";
Double amount = 0.0;
for (DynamodbStreamRecord r : input.getRecords()) {
// context.getLogger().log("Event id: "+r.getEventID());
// context.getLogger().log("Event name: "+r.getEventName());
// context.getLogger().log("Event source: "+r.getEventSource());
StreamRecord sr = r.getDynamodb();
for (Entry<String, AttributeValue> entry : sr.getNewImage()
.entrySet()) {
String k = entry.getKey();
AttributeValue v = entry.getValue();
switch (k) {
case "employee_id":
employeeId = v.getS();
break;
case "employee_name":
employeeName = v.getS();
break;
case "expense_type":
expenseType = v.getS();
break;
case "amount":
amount = Double.valueOf(entry.getValue().getN());
break;
default:
context.getLogger().log("Key " + k + " is unknown.");
}
}
}
context.getLogger().log(
"ENTRY: " + employeeId + " | " + employeeName + " | "
+ expenseType + " | " + amount);
String from = "[email protected]"; // TODO Replace with your "From" address.
// This address must be verified.
String to = "[email protected]"; // TODO Replace with a "To" address. If you
// have not yet requested production
// access, this address must be
// verified.
String subject = String.format("Expense reimbursment request by %s",
employeeName);
// TODO Replace with your own approval URL
String approvalUrl = String
.format("https://.......execute-api.eu-west-1.amazonaws.com/test/reimbursment?id=%s",
employeeId);
String body = String
.format("Hello boss,\n\nplease approve my expense reimbursment:\n%s\n\nExpense type: %s\nAmount: %s EUR\n\nThanks!\n%s\nEmployee ID: %s ",
approvalUrl, expenseType, amount, employeeName,
employeeId);
sendMail(from, to, subject, body);
context.getLogger().log("Email sent from " + from + " to " + to);
return null;
}
@Test
public void testKeyValueOperations() throws Exception {
AmazonDynamoDBClient ddbClient = ddbProvider.getClient();
Assume.assumeNotNull("AWS client not null", ddbClient);
DynamoDBUtils.assertNoStaleTables(ddbClient, "before");
try {
try {
TableDescription description = DynamoDBUtils.createTable(ddbClient, tableName);
Assert.assertEquals("ACTIVE", description.getTableStatus());
WildFlyCamelContext camelctx = new WildFlyCamelContext();
camelctx.getNamingContext().bind("ddbClientB", ddbClient);
camelctx.getNamingContext().bind("dbsClientB", dbsProvider.getClient());
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").to("aws-ddb://" + tableName + "?amazonDDBClient=#ddbClientB");
from("aws-ddbstream://" + tableName + "?amazonDynamoDbStreamsClient=#dbsClientB")
.to("seda:end");
}
});
PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer();
pollingConsumer.start();
camelctx.start();
try {
DynamoDBUtils.putItem(camelctx, "Book 103 Title");
String result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS();
Assert.assertEquals("Book 103 Title", result);
Exchange exchange = pollingConsumer.receive(3000);
Assert.assertNull(exchange);
DynamoDBUtils.updItem(camelctx, "Book 103 Update");
result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS();
Assert.assertEquals("Book 103 Update", result);
exchange = pollingConsumer.receive(3000);
StreamRecord record = exchange.getIn().getBody(Record.class).getDynamodb();
Map<String, AttributeValue> oldImage = record.getOldImage();
Map<String, AttributeValue> newImage = record.getNewImage();
Assert.assertEquals("Book 103 Title", oldImage.get("Title").getS());
Assert.assertEquals("Book 103 Update", newImage.get("Title").getS());
} finally {
camelctx.close();
}
} finally {
DynamoDBUtils.deleteTable(ddbClient, tableName);
}
} finally {
DynamoDBUtils.assertNoStaleTables(ddbClient, "after");
}
}