类com.amazonaws.services.dynamodbv2.model.Record源码实例Demo

下面列出了怎么用com.amazonaws.services.dynamodbv2.model.Record的API类实例代码及写法,或者点击链接到github查看源代码。

/**
 * Creates a DynamoDB write request based on the DynamoDB Stream record.
 *
 * @param record
 *            The DynamoDB Stream record containing information about the update to a DynamoDB table
 * @return A DynamoDB request based on the DynamoDB Stream record
 */
private AmazonWebServiceRequest createRequest(final Record record) {
    final String eventName = record.getEventName();
    final AmazonWebServiceRequest request;
    if (eventName.equalsIgnoreCase(OperationType.INSERT.toString()) || eventName.equalsIgnoreCase(OperationType.MODIFY.toString())) {
        // For INSERT or MODIFY: Put the new image in the DynamoDB table
        PutItemRequest putItemRequest = new PutItemRequest();
        putItemRequest.setItem(record.getDynamodb().getNewImage());
        putItemRequest.setTableName(getTableName());
        request = putItemRequest;
    } else if (eventName.equalsIgnoreCase(OperationType.REMOVE.toString())) {
        // For REMOVE: Delete the item from the DynamoDB table
        DeleteItemRequest deleteItemRequest = new DeleteItemRequest();
        deleteItemRequest.setKey(record.getDynamodb().getKeys());
        deleteItemRequest.setTableName(getTableName());
        request = deleteItemRequest;
    } else {
        // This should only happen if DynamoDB Streams adds/changes its operation types
        log.warn("Unsupported operation type detected: " + eventName + ". Record: " + record);
        request = null;
    }
    if (null != request) {
        request.getRequestClientOptions().appendUserAgent(USER_AGENT);
    }
    return request;
}
 
/**
 * {@inheritDoc}
 */
@Override
public void fail(final List<Record> records) {
    if (isShutdown) {
        if (records.isEmpty()) {
            // This is OK (but not expected)
            log.warn("Emitter fail method called after shutdown method was called. Continuing because list is empty");
            return;
        } else {
            throw new IllegalStateException("Emitter fail method called after shutdown method was called.");
        }
    }
    for (Record record : records) {
        log.error("Could not emit record: " + record);
    }
    final AmazonCloudWatchAsync cloudwatch = CLOUDWATCH.get();
    if (null != cloudwatch) {
        final double failed = records.size();
        final MetricDatum recordsProcessedFailedDatum = new MetricDatum().withMetricName(RECORDS_FAILED).withValue(failed).withUnit(StandardUnit.Count)
            .withTimestamp(new Date());
        final PutMetricDataRequest request = new PutMetricDataRequest().withNamespace(applicationName).withMetricData(recordsProcessedFailedDatum);
        cloudwatch.putMetricDataAsync(request);
    }
}
 
@SuppressWarnings("deprecation")
@Test
public void insertTest() throws Exception {
    // Set up the buffer and do sanity checks
    buffer.clear();
    buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT.getDynamodb().getSequenceNumber());
    assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(), buffer.getFirstSequenceNumber());
    assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(), buffer.getLastSequenceNumber());
    List<Record> buffered = buffer.getRecords();
    assertEquals(1, buffered.size());
    assertTrue(buffered.contains(ITEM1_INSERT));

    // Emit record
    resetAll(DYNAMODB);
    expectNew(AmazonDynamoDBAsyncClient.class, new Class<?>[] {AWSCredentialsProvider.class, ClientConfiguration.class, ExecutorService.class}, anyObject(AWSCredentialsProvider.class), anyObject(ClientConfiguration.class), anyObject(ExecutorService.class)).andReturn(DYNAMODB);
    DYNAMODB.putItemAsync(EasyMock.anyObject(PutItemRequest.class), anyObject(AsyncHandler.class));
    expectLastCall().andAnswer(SUCCESS_ANSWER);

    DYNAMODB.setEndpoint(EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();
    replayAll(DYNAMODB);
    IEmitter<Record> instance = createEmitterInstance();
    assertTrue(instance.emit(new UnmodifiableBuffer<Record>(buffer)).isEmpty());
    verifyAll();
}
 
@SuppressWarnings("deprecation")
@Test
public void modifyTest() throws Exception {
    // Set up the buffer and do sanity checks
    buffer.clear();
    buffer.consumeRecord(ITEM1_MODIFY, ITEM1_MODIFY.getDynamodb().getSizeBytes().intValue(), ITEM1_MODIFY.getDynamodb().getSequenceNumber());
    assertEquals(ITEM1_MODIFY.getDynamodb().getSequenceNumber(), buffer.getFirstSequenceNumber());
    assertEquals(ITEM1_MODIFY.getDynamodb().getSequenceNumber(), buffer.getLastSequenceNumber());
    List<Record> buffered = buffer.getRecords();
    assertEquals(1, buffered.size());
    assertTrue(buffered.contains(ITEM1_MODIFY));

    // Emit record
    resetAll(DYNAMODB);
    DYNAMODB.putItemAsync(EasyMock.anyObject(PutItemRequest.class), anyObject(AsyncHandler.class));
    expectLastCall().andAnswer(SUCCESS_ANSWER);
    expectNew(AmazonDynamoDBAsyncClient.class, new Class<?>[] {AWSCredentialsProvider.class, ClientConfiguration.class, ExecutorService.class}, anyObject(AWSCredentialsProvider.class), anyObject(ClientConfiguration.class), anyObject(ExecutorService.class)).andReturn(DYNAMODB);
    DYNAMODB.setEndpoint(EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();
    replayAll(DYNAMODB);
    IEmitter<Record> instance = createEmitterInstance();
    assertTrue(instance.emit(new UnmodifiableBuffer<Record>(buffer)).isEmpty());
    verifyAll();
}
 
@SuppressWarnings("deprecation")
@Test
public void removeTest() throws Exception {
    // Set up the buffer and do sanity checks
    buffer.clear();
    buffer.consumeRecord(ITEM1_REMOVE, ITEM1_REMOVE.getDynamodb().getSizeBytes().intValue(), ITEM1_REMOVE.getDynamodb().getSequenceNumber());
    assertEquals(ITEM1_REMOVE.getDynamodb().getSequenceNumber(), buffer.getFirstSequenceNumber());
    assertEquals(ITEM1_REMOVE.getDynamodb().getSequenceNumber(), buffer.getLastSequenceNumber());
    List<Record> buffered = buffer.getRecords();
    assertEquals(1, buffered.size());
    assertTrue(buffered.contains(ITEM1_REMOVE));

    // Emit record
    resetAll(DYNAMODB);
    DYNAMODB.deleteItemAsync(anyObject(DeleteItemRequest.class), anyObject(AsyncHandler.class));
    expectLastCall().andAnswer(SUCCESS_ANSWER);
    expectNew(AmazonDynamoDBAsyncClient.class, new Class<?>[] {AWSCredentialsProvider.class, ClientConfiguration.class, ExecutorService.class}, anyObject(AWSCredentialsProvider.class), anyObject(ClientConfiguration.class), anyObject(ExecutorService.class)).andReturn(DYNAMODB);
    DYNAMODB.setEndpoint(EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();
    replayAll(DYNAMODB);
    IEmitter<Record> instance = createEmitterInstance();
    assertTrue(instance.emit(new UnmodifiableBuffer<Record>(buffer)).isEmpty());
    verifyAll();
}
 
@SuppressWarnings("deprecation")
@Test
public void multipleRecordsEmitsTest() throws Exception {
    // Set up the buffer and do sanity checks
    buffer.clear();
    buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT.getDynamodb().getSequenceNumber());
    buffer.consumeRecord(ITEM2_INSERT, ITEM2_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM2_INSERT.getDynamodb().getSequenceNumber());
    assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(), buffer.getFirstSequenceNumber());
    assertEquals(ITEM2_INSERT.getDynamodb().getSequenceNumber(), buffer.getLastSequenceNumber());
    List<Record> buffered = buffer.getRecords();
    assertEquals(2, buffered.size());
    assertTrue(buffered.contains(ITEM1_INSERT));
    assertTrue(buffered.contains(ITEM2_INSERT));

    // Emit record
    resetAll(DYNAMODB);
    DYNAMODB.putItemAsync(EasyMock.anyObject(PutItemRequest.class), anyObject(AsyncHandler.class));
    expectLastCall().andAnswer(SUCCESS_ANSWER).times(2);
    expectNew(AmazonDynamoDBAsyncClient.class, new Class<?>[] {AWSCredentialsProvider.class, ClientConfiguration.class, ExecutorService.class}, anyObject(AWSCredentialsProvider.class), anyObject(ClientConfiguration.class), anyObject(ExecutorService.class)).andReturn(DYNAMODB);
    DYNAMODB.setEndpoint(EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();
    replayAll(DYNAMODB);
    IEmitter<Record> instance = createEmitterInstance();
    assertTrue(instance.emit(new UnmodifiableBuffer<Record>(buffer)).isEmpty());
    verifyAll();
}
 
@Test
public void multipleRecordsTest() {
    DynamoDBBuffer buffer = new DynamoDBBuffer(new DynamoDBStreamsConnectorConfiguration(new Properties(), null));

    buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT
        .getDynamodb().getSequenceNumber());
    buffer.consumeRecord(ITEM2_INSERT, ITEM2_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM2_INSERT
        .getDynamodb().getSequenceNumber());
    assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getFirstSequenceNumber());
    assertEquals(ITEM2_INSERT.getDynamodb().getSequenceNumber(),buffer.getLastSequenceNumber());
    List<Record> buffered = buffer.getRecords();
    assertEquals(2, buffered.size());
    assertTrue(buffered.contains(ITEM1_INSERT));
    assertTrue(buffered.contains(ITEM2_INSERT));
    assertTrue(buffer.shouldFlush());
}
 
@Test
public void dedupSanityTest() {
    DynamoDBBuffer buffer = new DynamoDBBuffer(new DynamoDBStreamsConnectorConfiguration(new Properties(), null));

    buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT
        .getDynamodb().getSequenceNumber());
    buffer.consumeRecord(ITEM1_MODIFY, ITEM1_MODIFY.getDynamodb().getSizeBytes().intValue(), ITEM1_MODIFY
        .getDynamodb().getSequenceNumber());
    assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getFirstSequenceNumber());
    assertEquals(ITEM1_MODIFY.getDynamodb().getSequenceNumber(),buffer.getLastSequenceNumber());
    List<Record> buffered = buffer.getRecords();
    assertEquals(1, buffered.size());
    assertTrue(buffered.contains(ITEM1_MODIFY));
    assertFalse(buffered.contains(ITEM1_INSERT));
    assertTrue(buffer.shouldFlush());
}
 
@Test
public void dedupMultipleRecordsTest() {
    DynamoDBBuffer buffer = new DynamoDBBuffer(new DynamoDBStreamsConnectorConfiguration(new Properties(), null));

    buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT
        .getDynamodb().getSequenceNumber());
    buffer.consumeRecord(ITEM1_MODIFY, ITEM1_MODIFY.getDynamodb().getSizeBytes().intValue(), ITEM1_MODIFY
        .getDynamodb().getSequenceNumber());
    buffer.consumeRecord(ITEM2_INSERT, ITEM2_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM2_INSERT
        .getDynamodb().getSequenceNumber());
    buffer.consumeRecord(ITEM2_MODIFY, ITEM2_MODIFY.getDynamodb().getSizeBytes().intValue(), ITEM2_MODIFY
        .getDynamodb().getSequenceNumber());
    assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getFirstSequenceNumber());
    assertEquals(ITEM2_MODIFY.getDynamodb().getSequenceNumber(),buffer.getLastSequenceNumber());
    List<Record> buffered = buffer.getRecords();
    assertEquals(2, buffered.size());
    assertTrue(buffered.contains(ITEM1_MODIFY));
    assertFalse(buffered.contains(ITEM1_INSERT));
    assertTrue(buffered.contains(ITEM2_MODIFY));
    assertFalse(buffered.contains(ITEM2_INSERT));
    assertTrue(buffer.shouldFlush());
}
 
@Override
public IEmitter<Record> getEmitter(final KinesisConnectorConfiguration configuration) {
    if (configuration instanceof DynamoDBStreamsConnectorConfiguration) {
        return new DynamoDBReplicationEmitter((DynamoDBStreamsConnectorConfiguration) configuration);
    } else {
        throw new IllegalArgumentException(this + " needs a DynamoDBStreamsConnectorConfiguration argument.");
    }

}
 
@Override
public IBuffer<Record> getBuffer(final KinesisConnectorConfiguration configuration) {
    if (configuration instanceof DynamoDBStreamsConnectorConfiguration) {
        return new DynamoDBBuffer((DynamoDBStreamsConnectorConfiguration) configuration);
    } else {
        throw new IllegalArgumentException(this + " needs a DynamoDBStreamsConnectorConfiguration argument.");
    }
}
 
/**
 * {@inheritDoc}
 */
@Override
public Record toClass(final com.amazonaws.services.kinesis.model.Record record) throws IOException {
    if (record instanceof RecordAdapter) {
        return ((RecordAdapter) record).getInternalObject();
    } else {
        return MAPPER.readValue(new String(record.getData().array(), ENCODING), Record.class);
    }
}
 
/**
 * {@inheritDoc}
 */
@Override
public void consumeRecord(final Record record, final int recordBytes, final String sequenceNumber) {
    // Use HashMap to deduplicate using the DynamoDB key as the key.
    getBuffer().put(record.getDynamodb().getKeys(), record);
    // Sequence number bound maintenance
    setLastSequenceNumber(sequenceNumber);
    if (getFirstSequenceNumber() == null) {
        setFirstSequenceNumber(getLastSequenceNumber());
    }
    setProcessedRecords(getProcessedRecords() + 1);
    emitCloudWatchMetrics();
}
 
@Test
public void sanityTest() {
    DynamoDBBuffer buffer = new DynamoDBBuffer(new DynamoDBStreamsConnectorConfiguration(new Properties(), null));

    buffer.consumeRecord(ITEM1_INSERT, ITEM1_INSERT.getDynamodb().getSizeBytes().intValue(), ITEM1_INSERT
        .getDynamodb().getSequenceNumber());
    assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getFirstSequenceNumber());
    assertEquals(ITEM1_INSERT.getDynamodb().getSequenceNumber(),buffer.getLastSequenceNumber());
    List<Record> buffered = buffer.getRecords();
    assertEquals(1, buffered.size());
    assertTrue(buffered.contains(ITEM1_INSERT));
    assertTrue(buffer.shouldFlush());
}
 
@Override
protected IEmitter<Record> createEmitterInstance() {
    Properties properties = new Properties();
    properties.setProperty("APP_NAME", "TEST");
    properties.setProperty("DYNAMODB_ENDPOINT", "ENDPOINT");
    properties.setProperty("REGION_NAME", "REGION");
    properties.setProperty("DYNAMODB_DATA_TABLE_NAME", "TABLE");
    AWSStaticCredentialsProvider credentialProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("Access", "Secret"));
    return new DynamoDBReplicationEmitter(new DynamoDBStreamsConnectorConfiguration(properties, credentialProvider),
        new AmazonDynamoDBAsyncClient(credentialProvider, new ClientConfiguration().withMaxErrorRetry(0), Executors.newFixedThreadPool(MAX_THREADS)), null);
}
 
源代码16 项目: Flink-CEPplus   文件: DynamoDBStreamsSchema.java
@Override
public Record deserialize(byte[] message, String partitionKey, String seqNum,
		long approxArrivalTimestamp, String stream, String shardId) throws IOException {
	return MAPPER.readValue(message, Record.class);
}
 
源代码17 项目: Flink-CEPplus   文件: DynamoDBStreamsSchema.java
@Override
public TypeInformation<Record> getProducedType() {
	return TypeInformation.of(Record.class);
}
 
源代码18 项目: flink   文件: DynamoDBStreamsSchema.java
@Override
public Record deserialize(byte[] message, String partitionKey, String seqNum,
		long approxArrivalTimestamp, String stream, String shardId) throws IOException {
	return MAPPER.readValue(message, Record.class);
}
 
源代码19 项目: flink   文件: DynamoDBStreamsSchema.java
@Override
public TypeInformation<Record> getProducedType() {
	return TypeInformation.of(Record.class);
}
 
源代码20 项目: podyn   文件: DynamoDBTableReplicator.java
protected IRecordProcessor createStreamProcessor() {
	return new IRecordProcessor() {

		@Override
		public void initialize(InitializationInput initializationInput) {
		}

		public List<Record> extractDynamoStreamRecords(List<com.amazonaws.services.kinesis.model.Record> kinesisRecords) {
			List<Record> dynamoRecords = new ArrayList<>(kinesisRecords.size());

			for(com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) {
				if (kinesisRecord instanceof RecordAdapter) {
					Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject();
					dynamoRecords.add(dynamoRecord);
				}
			}

			return dynamoRecords;
		}

		@Override
		public void processRecords(ProcessRecordsInput processRecordsInput) {
			List<Record> records = extractDynamoStreamRecords(processRecordsInput.getRecords());

			DynamoDBTableReplicator.this.processRecords(records);

			checkpoint(processRecordsInput.getCheckpointer());
		}

		@Override
		public void shutdown(ShutdownInput shutdownInput) {
			if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
				checkpoint(shutdownInput.getCheckpointer());
			}
		}

		void checkpoint(IRecordProcessorCheckpointer checkpointer) {
			try {
				checkpointer.checkpoint();
			} catch (KinesisClientLibDependencyException|InvalidStateException|ThrottlingException|ShutdownException e) {
				LOG.warn(e);
			}
		}
	};
}
 
public Worker createWorker() {

        // use default credential provider chain to locate appropriate credentials
        final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();

        // initialize DynamoDB client and set the endpoint properly for source table / region
        final AmazonDynamoDB dynamodbClient = AmazonDynamoDBClientBuilder.standard()
                .withCredentials(credentialsProvider)
                .withEndpointConfiguration(createEndpointConfiguration(sourceRegion, sourceDynamodbEndpoint, AmazonDynamoDB.ENDPOINT_PREFIX))
                .build();

        // initialize Streams client
        final AwsClientBuilder.EndpointConfiguration streamsEndpointConfiguration = createEndpointConfiguration(sourceRegion,
                sourceDynamodbStreamsEndpoint, AmazonDynamoDBStreams.ENDPOINT_PREFIX);
        final ClientConfiguration streamsClientConfig = new ClientConfiguration().withGzip(false);
        final AmazonDynamoDBStreams streamsClient = AmazonDynamoDBStreamsClientBuilder.standard()
                .withCredentials(credentialsProvider)
                .withEndpointConfiguration(streamsEndpointConfiguration)
                .withClientConfiguration(streamsClientConfig)
                .build();

        // obtain the Stream ID associated with the source table
        final String streamArn = dynamodbClient.describeTable(sourceTable).getTable().getLatestStreamArn();
        final boolean streamEnabled = DynamoDBConnectorUtilities.isStreamsEnabled(streamsClient, streamArn, DynamoDBConnectorConstants.NEW_AND_OLD);
        Preconditions.checkArgument(streamArn != null, DynamoDBConnectorConstants.MSG_NO_STREAMS_FOUND);
        Preconditions.checkState(streamEnabled, DynamoDBConnectorConstants.STREAM_NOT_READY);

        // initialize DynamoDB client for KCL
        final AmazonDynamoDB kclDynamoDBClient = AmazonDynamoDBClientBuilder.standard()
                .withCredentials(credentialsProvider)
                .withEndpointConfiguration(createKclDynamoDbEndpointConfiguration())
                .build();

        // initialize DynamoDB Streams Adapter client and set the Streams endpoint properly
        final AmazonDynamoDBStreamsAdapterClient streamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsClient);

        // initialize CloudWatch client and set the region to emit metrics to
        final AmazonCloudWatch kclCloudWatchClient;
        if (isPublishCloudWatch) {
            kclCloudWatchClient = AmazonCloudWatchClientBuilder.standard()
                    .withCredentials(credentialsProvider)
                    .withRegion(kclRegion.or(sourceRegion).getName()).build();
        } else {
            kclCloudWatchClient = new NoopCloudWatch();
        }

        // try to get taskname from command line arguments, auto generate one if needed
        final AwsClientBuilder.EndpointConfiguration destinationEndpointConfiguration = createEndpointConfiguration(destinationRegion,
                destinationDynamodbEndpoint, AmazonDynamoDB.ENDPOINT_PREFIX);
        final String actualTaskName = DynamoDBConnectorUtilities.getTaskName(sourceRegion, destinationRegion, taskName, sourceTable, destinationTable);

        // set the appropriate Connector properties for the destination KCL configuration
        final Properties properties = new Properties();
        properties.put(DynamoDBStreamsConnectorConfiguration.PROP_APP_NAME, actualTaskName);
        properties.put(DynamoDBStreamsConnectorConfiguration.PROP_DYNAMODB_ENDPOINT, destinationEndpointConfiguration.getServiceEndpoint());
        properties.put(DynamoDBStreamsConnectorConfiguration.PROP_DYNAMODB_DATA_TABLE_NAME, destinationTable);
        properties.put(DynamoDBStreamsConnectorConfiguration.PROP_REGION_NAME, destinationRegion.getName());

        // create the record processor factory based on given pipeline and connector configurations
        // use the master to replicas pipeline
        final KinesisConnectorRecordProcessorFactory<Record, Record> factory = new KinesisConnectorRecordProcessorFactory<>(
                new DynamoDBMasterToReplicasPipeline(), new DynamoDBStreamsConnectorConfiguration(properties, credentialsProvider));

        // create the KCL configuration with default values
        final KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(actualTaskName,
                streamArn,
                credentialsProvider,
                DynamoDBConnectorConstants.WORKER_LABEL + actualTaskName + UUID.randomUUID().toString())
                // worker will use checkpoint table if available, otherwise it is safer
                // to start at beginning of the stream
                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
                // we want the maximum batch size to avoid network transfer latency overhead
                .withMaxRecords(getRecordsLimit.or(DynamoDBConnectorConstants.STREAMS_RECORDS_LIMIT))
                // wait a reasonable amount of time - default 0.5 seconds
                .withIdleTimeBetweenReadsInMillis(DynamoDBConnectorConstants.IDLE_TIME_BETWEEN_READS)
                // Remove calls to GetShardIterator
                .withValidateSequenceNumberBeforeCheckpointing(false)
                // make parent shard poll interval tunable to decrease time to run integration test
                .withParentShardPollIntervalMillis(parentShardPollIntervalMillis.or(DynamoDBConnectorConstants.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS))
                // avoid losing leases too often - default 60 seconds
                .withFailoverTimeMillis(DynamoDBConnectorConstants.KCL_FAILOVER_TIME);

        // create the KCL worker for this connector
        return new Worker(factory, kclConfig, streamsAdapterClient, kclDynamoDBClient, kclCloudWatchClient);
    }
 
@Override
public ITransformer<Record, Record> getTransformer(final KinesisConnectorConfiguration configuration) {
    return new DynamoDBStreamsRecordTransformer();
}
 
@Override
public IFilter<Record> getFilter(final KinesisConnectorConfiguration configuration) {
    return new AllPassFilter<Record>();
}
 
/**
 * {@inheritDoc}
 */
@Override
public Record fromClass(final Record record) throws IOException {
    // since the emitter expects DynamoDB stream records, do nothing here
    return record;
}
 
/**
 * {@inheritDoc}
 */
@SuppressWarnings("unchecked")
@Override
public List<Record> emit(final UnmodifiableBuffer<Record> buffer) {
    if (isShutdown) {
        if (buffer.getRecords().isEmpty()) {
            // This is OK, but not expected
            log.warn("Record processor called emit after calling shutdown. Continuing becuase buffer is empty.");
            return Collections.emptyList();
        } else {
            throw new IllegalStateException("Cannot emit records after emitter has been shutdown.");
        }
    }
    // Asynchronously process all writes, but block on the results.
    List<Record> records = buffer.getRecords();
    // Stores records that failed with a non-retryable exception
    final List<Record> failedRecords = Collections.synchronizedList(new ArrayList<Record>());
    // Queue of records to submit
    final BlockingQueue<Record> toSubmit = new LinkedBlockingQueue<Record>(records);
    // Used to detect when all requests have either succeeded or resulted in a non-retryable exception
    final CountDownLatch doneSignal = new CountDownLatch(records.size());
    final AtomicInteger retryCount = new AtomicInteger();
    boolean interrupted = false;
    try {
        while (doneSignal.getCount() > 0) {
            Record recordToSubmit = null;
            try {
                recordToSubmit = toSubmit.poll(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                interrupted = true;
            }
            final Record record = recordToSubmit;
            if (null == record) {
                continue; // Check if all records have completed and if not try to poll again
            }
            // Generate the request based on the record
            AmazonWebServiceRequest request = createRequest(record);
            if (request == null) { // Should only happen if DynamoDB Streams API updates to support different operations
                                   // than {INSERT, MODIFY, REMOVE}.
                continue;
            }
            // Submit the write request based on its type
            if (request instanceof PutItemRequest) { // PUT
                getDynamodb().putItemAsync((PutItemRequest) request,
                    (AsyncHandler<PutItemRequest, PutItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
            } else if (request instanceof DeleteItemRequest) { // DELETE
                getDynamodb().deleteItemAsync((DeleteItemRequest) request,
                    (AsyncHandler<DeleteItemRequest, DeleteItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
            } else if (request instanceof UpdateItemRequest) { // UPDATE
                getDynamodb().updateItemAsync((UpdateItemRequest) request,
                    (AsyncHandler<UpdateItemRequest, UpdateItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
            } else { // Should only happen if DynamoDB allows a new operation other than {PutItem, DeleteItem,
                     // UpdateItem} for single item writes.
                log.warn("Unsupported DynamoDB request: " + request);
            }
        }
    } finally {
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
    emitCloudWatchMetrics(records, failedRecords, retryCount);
    if (!records.isEmpty()) {
        log.debug("Successfully emitted " + (records.size() - failedRecords.size()) + " records ending with sequence number "
            + buffer.getLastSequenceNumber());
    } else {
        log.debug("No records to emit");
    }
    return failedRecords;
}
 
/**
 * {@inheritDoc}
 */
@Override
public List<Record> getRecords() {
    // Convert records to a list
    return new ArrayList<Record>(getBuffer().values());
}
 
/**
 * @return the buffer
 */
public Map<Map<String, AttributeValue>, Record> getBuffer() {
    return buffer;
}
 
@Override
protected IEmitter<Record> createEmitterInstance() {
    return new DynamoDBReplicationEmitter("TEST", "ENDPOINT", "REGION", "TABLE", null, new AWSStaticCredentialsProvider(new BasicAWSCredentials("Access", "Secret")));
}
 
源代码29 项目: flink   文件: DynamoDBStreamsSchema.java
@Override
public Record deserialize(byte[] message, String partitionKey, String seqNum,
		long approxArrivalTimestamp, String stream, String shardId) throws IOException {
	return MAPPER.readValue(message, Record.class);
}
 
源代码30 项目: flink   文件: DynamoDBStreamsSchema.java
@Override
public TypeInformation<Record> getProducedType() {
	return TypeInformation.of(Record.class);
}
 
 类方法
 同包方法