下面列出了怎么用com.amazonaws.services.dynamodbv2.model.StreamSpecification的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* @return StreamArn
*/
public static String createTable(AmazonDynamoDB client, String tableName) {
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));
java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
// key
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
.withWriteCapacityUnits(2L);
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
.withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
.withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
try {
System.out.println("Creating table " + tableName);
CreateTableResult result = client.createTable(createTableRequest);
return result.getTableDescription().getLatestStreamArn();
}
catch (ResourceInUseException e) {
System.out.println("Table already exists.");
return describeTable(client, tableName).getTable().getLatestStreamArn();
}
}
public static TableDescription createTable(AmazonDynamoDB client, String tableName) throws InterruptedException {
CreateTableRequest tableReq = new CreateTableRequest().withTableName(tableName)
.withKeySchema(new KeySchemaElement("Id", KeyType.HASH))
.withAttributeDefinitions(new AttributeDefinition("Id", ScalarAttributeType.N))
.withProvisionedThroughput(new ProvisionedThroughput(10L, 10L))
.withStreamSpecification(new StreamSpecification().withStreamEnabled(true).withStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES));
DynamoDB dynamoDB = new DynamoDB(client);
Table table = dynamoDB.createTable(tableReq);
return table.waitForActive();
}
/**
* @return StreamArn
*/
public static String createTable(AmazonDynamoDBClient client, String tableName) {
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));
java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH));
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput()
.withReadCapacityUnits(2L).withWriteCapacityUnits(2L);
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(tableName)
.withAttributeDefinitions(attributeDefinitions)
.withKeySchema(keySchema)
.withProvisionedThroughput(provisionedThroughput)
.withStreamSpecification(streamSpecification);
try {
System.out.println("Creating table " + tableName);
CreateTableResult result = client.createTable(createTableRequest);
return result.getTableDescription().getLatestStreamArn();
} catch(ResourceInUseException e) {
System.out.println("Table already exists.");
return describeTable(client, tableName).getTable().getLatestStreamArn();
}
}
@Test
public void testBothTablesAddStreamAfterCreation() throws InterruptedException {
//create table one in one region
final CreateTableRequest iadCreateTableRequest = createTableRequest(INVENTORY_TABLE_IAD);
dynamoDbIad.createTable(iadCreateTableRequest
.withStreamSpecification(new StreamSpecification()
.withStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES)
.withStreamEnabled(true)));
//create table two in another region
final CreateTableRequest pdxCreateTableRequest = createTableRequest(INVENTORY_TABLE_PDX);
dynamoDbPdx.createTable(pdxCreateTableRequest);
//create and start the command line client and worker
final List<String> commandLineArgs = Lists.newArrayList(
"--sourceEndpoint",
DYNAMODB_LOCAL_ENDPOINT,
// override the signing region as DynamoDB Local uses it to create different table namespaces
"--sourceSigningRegion",
Regions.US_EAST_1.getName(),
"--sourceTable",
INVENTORY_TABLE_IAD,
"--destinationEndpoint",
DYNAMODB_LOCAL_ENDPOINT,
// override the signing region as DynamoDB Local uses it to create different table namespaces
"--destinationSigningRegion",
Regions.US_WEST_2.getName(),
"--destinationTable",
INVENTORY_TABLE_PDX,
"--taskName",
CRR_INTEGRATION_TEST,
// 100ms - override to reduce the time to sleep
"--parentShardPollIntervalMillis",
"100",
"--dontPublishCloudwatch");
final String[] args = commandLineArgs.toArray(new String[commandLineArgs.size()]);
final Worker worker = CommandLineInterface.mainUnsafe(args).get();
final Thread workerThread = new Thread(worker, "KCLWorker");
workerThread.start();
//perform the updates on the source table
final Item asin1sea = new Item().withString(SKU_CODE, ASIN_1).withString(STORE, SEA);
iadTable.putItem(asin1sea);
final Item asin1seaRead = iadTable.getItem(SKU_CODE, ASIN_1, STORE, SEA);
assertEquals(asin1sea, asin1seaRead);
//verify the updates on the destination table
//wait for the worker to start and the update to propagate
Thread.sleep(10000);
final List<Item> pdxItems = new ArrayList<>();
for(Item item : pdxTable.scan()) {
pdxItems.add(item);
}
assertEquals(1, pdxItems.size());
final Item copied = Iterables.getOnlyElement(pdxItems);
assertEquals(asin1sea, copied);
//close the worker
worker.shutdown(); //this leaks threads, I wonder
}