类com.amazonaws.services.dynamodbv2.model.StreamRecord源码实例Demo

下面列出了怎么用com.amazonaws.services.dynamodbv2.model.StreamRecord的API类实例代码及写法,或者点击链接到github查看源代码。

private SourceRecord toSourceRecord(Map<String, String> sourcePartition, String topic, StreamRecord dynamoRecord) {
    return new SourceRecord(
            sourcePartition,
            Collections.singletonMap(Keys.SEQNUM, dynamoRecord.getSequenceNumber()),
            topic, null,
            RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getKeys()),
            RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getNewImage()),
            dynamoRecord.getApproximateCreationDateTime().getTime()
    );
}
 
@Override
public Object handleRequest(DynamodbEvent input, Context context) {
	String employeeId = "";
	String employeeName = "";
	String expenseType = "";
	Double amount = 0.0;

	for (DynamodbStreamRecord r : input.getRecords()) {
		// context.getLogger().log("Event id: "+r.getEventID());
		// context.getLogger().log("Event name: "+r.getEventName());
		// context.getLogger().log("Event source: "+r.getEventSource());
		StreamRecord sr = r.getDynamodb();
		for (Entry<String, AttributeValue> entry : sr.getNewImage()
				.entrySet()) {
			String k = entry.getKey();
			AttributeValue v = entry.getValue();
			switch (k) {
			case "employee_id":
				employeeId = v.getS();
				break;
			case "employee_name":
				employeeName = v.getS();
				break;
			case "expense_type":
				expenseType = v.getS();
				break;
			case "amount":
				amount = Double.valueOf(entry.getValue().getN());
				break;
			default:
				context.getLogger().log("Key " + k + " is unknown.");
			}
		}
	}

	context.getLogger().log(
			"ENTRY: " + employeeId + " | " + employeeName + " | "
					+ expenseType + " | " + amount);

	String from = "[email protected]"; // TODO Replace with your "From" address.
										// This address must be verified.
	String to = "[email protected]"; // TODO Replace with a "To" address. If you
										// have not yet requested production
										// access, this address must be
										// verified.
	String subject = String.format("Expense reimbursment request by %s",
			employeeName);
	// TODO Replace with your own approval URL
	String approvalUrl = String
			.format("https://.......execute-api.eu-west-1.amazonaws.com/test/reimbursment?id=%s",
					employeeId);
	String body = String
			.format("Hello boss,\n\nplease approve my expense reimbursment:\n%s\n\nExpense type: %s\nAmount: %s EUR\n\nThanks!\n%s\nEmployee ID: %s ",
					approvalUrl, expenseType, amount, employeeName,
					employeeId);
	sendMail(from, to, subject, body);
	context.getLogger().log("Email sent from " + from + " to " + to);

	return null;
}
 
@Test
public void testKeyValueOperations() throws Exception {

    AmazonDynamoDBClient ddbClient = ddbProvider.getClient();
    Assume.assumeNotNull("AWS client not null", ddbClient);

    DynamoDBUtils.assertNoStaleTables(ddbClient, "before");

    try {
        try {
            TableDescription description = DynamoDBUtils.createTable(ddbClient, tableName);
            Assert.assertEquals("ACTIVE", description.getTableStatus());

            WildFlyCamelContext camelctx = new WildFlyCamelContext();
            camelctx.getNamingContext().bind("ddbClientB", ddbClient);
            camelctx.getNamingContext().bind("dbsClientB", dbsProvider.getClient());

            camelctx.addRoutes(new RouteBuilder() {
                @Override
                public void configure() throws Exception {
                    from("direct:start").to("aws-ddb://" + tableName + "?amazonDDBClient=#ddbClientB");
                    from("aws-ddbstream://" + tableName + "?amazonDynamoDbStreamsClient=#dbsClientB")
                            .to("seda:end");
                }
            });

            PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer();
            pollingConsumer.start();

            camelctx.start();
            try {
                DynamoDBUtils.putItem(camelctx, "Book 103 Title");

                String result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS();
                Assert.assertEquals("Book 103 Title", result);

                Exchange exchange = pollingConsumer.receive(3000);
                Assert.assertNull(exchange);

                DynamoDBUtils.updItem(camelctx, "Book 103 Update");

                result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS();
                Assert.assertEquals("Book 103 Update", result);

                exchange = pollingConsumer.receive(3000);
                StreamRecord record = exchange.getIn().getBody(Record.class).getDynamodb();
                Map<String, AttributeValue> oldImage = record.getOldImage();
                Map<String, AttributeValue> newImage = record.getNewImage();
                Assert.assertEquals("Book 103 Title", oldImage.get("Title").getS());
                Assert.assertEquals("Book 103 Update", newImage.get("Title").getS());

            } finally {
                camelctx.close();
            }
        } finally {
            DynamoDBUtils.deleteTable(ddbClient, tableName);
        }
    } finally {
        DynamoDBUtils.assertNoStaleTables(ddbClient, "after");
    }
}
 
 类方法
 同包方法