类com.amazonaws.services.dynamodbv2.model.StreamSpecification源码实例Demo

下面列出了怎么用com.amazonaws.services.dynamodbv2.model.StreamSpecification的API类实例代码及写法,或者点击链接到github查看源代码。

/**
 * @return StreamArn
 */
public static String createTable(AmazonDynamoDB client, String tableName) {
    java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
    attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

    java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
    keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                             // key

    ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
        .withWriteCapacityUnits(2L);

    StreamSpecification streamSpecification = new StreamSpecification();
    streamSpecification.setStreamEnabled(true);
    streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
    CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
        .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
        .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

    try {
        System.out.println("Creating table " + tableName);
        CreateTableResult result = client.createTable(createTableRequest);
        return result.getTableDescription().getLatestStreamArn();
    }
    catch (ResourceInUseException e) {
        System.out.println("Table already exists.");
        return describeTable(client, tableName).getTable().getLatestStreamArn();
    }
}
 
源代码2 项目: wildfly-camel   文件: DynamoDBUtils.java
public static TableDescription createTable(AmazonDynamoDB client, String tableName) throws InterruptedException {
    CreateTableRequest tableReq = new CreateTableRequest().withTableName(tableName)
            .withKeySchema(new KeySchemaElement("Id", KeyType.HASH))
            .withAttributeDefinitions(new AttributeDefinition("Id", ScalarAttributeType.N))
            .withProvisionedThroughput(new ProvisionedThroughput(10L, 10L))
            .withStreamSpecification(new StreamSpecification().withStreamEnabled(true).withStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES));

    DynamoDB dynamoDB = new DynamoDB(client);
    Table table = dynamoDB.createTable(tableReq);
    return table.waitForActive();
}
 
/**
 * @return StreamArn
 */
public static String createTable(AmazonDynamoDBClient client, String tableName) {
    java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
    attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

    java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
    keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH));

    ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput()
        .withReadCapacityUnits(2L).withWriteCapacityUnits(2L);

    StreamSpecification streamSpecification = new StreamSpecification();
    streamSpecification.setStreamEnabled(true);
    streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
    CreateTableRequest createTableRequest = new CreateTableRequest()
        .withTableName(tableName)
        .withAttributeDefinitions(attributeDefinitions)
        .withKeySchema(keySchema)
        .withProvisionedThroughput(provisionedThroughput)
        .withStreamSpecification(streamSpecification);

    try {
        System.out.println("Creating table " + tableName);
        CreateTableResult result = client.createTable(createTableRequest);
        return result.getTableDescription().getLatestStreamArn();
    } catch(ResourceInUseException e) {
        System.out.println("Table already exists.");
        return describeTable(client, tableName).getTable().getLatestStreamArn();
    }
}
 
@Test
public void testBothTablesAddStreamAfterCreation() throws InterruptedException {
    //create table one in one region
    final CreateTableRequest iadCreateTableRequest = createTableRequest(INVENTORY_TABLE_IAD);
    dynamoDbIad.createTable(iadCreateTableRequest
            .withStreamSpecification(new StreamSpecification()
                    .withStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES)
                    .withStreamEnabled(true)));
    //create table two in another region
    final CreateTableRequest pdxCreateTableRequest = createTableRequest(INVENTORY_TABLE_PDX);
    dynamoDbPdx.createTable(pdxCreateTableRequest);

    //create and start the command line client and worker
    final List<String> commandLineArgs = Lists.newArrayList(
            "--sourceEndpoint",
            DYNAMODB_LOCAL_ENDPOINT,
            // override the signing region as DynamoDB Local uses it to create different table namespaces
            "--sourceSigningRegion",
            Regions.US_EAST_1.getName(),
            "--sourceTable",
            INVENTORY_TABLE_IAD,
            "--destinationEndpoint",
            DYNAMODB_LOCAL_ENDPOINT,
            // override the signing region as DynamoDB Local uses it to create different table namespaces
            "--destinationSigningRegion",
            Regions.US_WEST_2.getName(),
            "--destinationTable",
            INVENTORY_TABLE_PDX,
            "--taskName",
            CRR_INTEGRATION_TEST,
            // 100ms - override to reduce the time to sleep
            "--parentShardPollIntervalMillis",
            "100",
            "--dontPublishCloudwatch");
    final String[] args = commandLineArgs.toArray(new String[commandLineArgs.size()]);
    final Worker worker = CommandLineInterface.mainUnsafe(args).get();
    final Thread workerThread = new Thread(worker, "KCLWorker");
    workerThread.start();

    //perform the updates on the source table
    final Item asin1sea = new Item().withString(SKU_CODE, ASIN_1).withString(STORE, SEA);
    iadTable.putItem(asin1sea);
    final Item asin1seaRead = iadTable.getItem(SKU_CODE, ASIN_1, STORE, SEA);
    assertEquals(asin1sea, asin1seaRead);

    //verify the updates on the destination table
    //wait for the worker to start and the update to propagate
    Thread.sleep(10000);
    final List<Item> pdxItems = new ArrayList<>();
    for(Item item : pdxTable.scan()) {
        pdxItems.add(item);
    }
    assertEquals(1, pdxItems.size());
    final Item copied = Iterables.getOnlyElement(pdxItems);
    assertEquals(asin1sea, copied);

    //close the worker
    worker.shutdown(); //this leaks threads, I wonder
}
 
 类方法
 同包方法