下面列出了怎么用com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration的API类实例代码及写法,或者点击链接到github查看源代码。
public ElasticsearchEmitter(KinesisConnectorConfiguration configuration) {
Settings settings =
ImmutableSettings.settingsBuilder()
.put(ELASTICSEARCH_CLUSTER_NAME_KEY, configuration.ELASTICSEARCH_CLUSTER_NAME)
.put(ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY, configuration.ELASTICSEARCH_TRANSPORT_SNIFF)
.put(ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY,
configuration.ELASTICSEARCH_IGNORE_CLUSTER_NAME)
.put(ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY, configuration.ELASTICSEARCH_PING_TIMEOUT)
.put(ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY,
configuration.ELASTICSEARCH_NODE_SAMPLER_INTERVAL)
.build();
elasticsearchEndpoint = configuration.ELASTICSEARCH_ENDPOINT;
elasticsearchPort = configuration.ELASTICSEARCH_PORT;
LOG.info("ElasticsearchEmitter using elasticsearch endpoint " + elasticsearchEndpoint + ":" + elasticsearchPort);
elasticsearchClient = new TransportClient(settings);
elasticsearchClient.addTransportAddress(new InetSocketTransportAddress(elasticsearchEndpoint, elasticsearchPort));
}
public RedshiftManifestEmitter(KinesisConnectorConfiguration configuration) {
dataTable = configuration.REDSHIFT_DATA_TABLE;
fileTable = configuration.REDSHIFT_FILE_TABLE;
fileKeyColumn = configuration.REDSHIFT_FILE_KEY_COLUMN;
dataDelimiter = configuration.REDSHIFT_DATA_DELIMITER;
copyMandatory = configuration.REDSHIFT_COPY_MANDATORY;
s3Bucket = configuration.S3_BUCKET;
s3Endpoint = configuration.S3_ENDPOINT;
s3Client = new AmazonS3Client(configuration.AWS_CREDENTIALS_PROVIDER);
if (s3Endpoint != null) {
s3Client.setEndpoint(s3Endpoint);
}
credentialsProvider = configuration.AWS_CREDENTIALS_PROVIDER;
loginProps = new Properties();
loginProps.setProperty("user", configuration.REDSHIFT_USERNAME);
loginProps.setProperty("password", configuration.REDSHIFT_PASSWORD);
redshiftURL = configuration.REDSHIFT_URL;
}
@Before
@SuppressWarnings("unchecked")
public void setUp() {
// control object used to create mock dependencies
control = EasyMock.createControl();
// mock dependencies
emitter = control.createMock(IEmitter.class);
transformer = control.createMock(ITransformer.class);
buffer = control.createMock(IBuffer.class);
filter = control.createMock(IFilter.class);
checkpointer = control.createMock(IRecordProcessorCheckpointer.class);
// use a real configuration to get actual default values (not anything created by EasyMock)
configuration = new KinesisConnectorConfiguration(new Properties(), new DefaultAWSCredentialsProviderChain());
}
@Before
public void setUp() {
// object under test
Properties props = new Properties();
AWSCredentialsProvider creds = createMock(AWSCredentialsProvider.class);
configuration = new KinesisConnectorConfiguration(props, creds);
emitter = new ElasticsearchEmitter(configuration);
buffer = createMock(UnmodifiableBuffer.class);
mockBulkBuilder = createMock(BulkRequestBuilder.class);
mockIndexBuilder = createMock(IndexRequestBuilder.class);
mockFuture = createMock(ListenableActionFuture.class);
mockBulkResponse = createMock(BulkResponse.class);
// overwrite the elasticseach client with a mock
elasticsearchClientMock = createMock(TransportClient.class);
setField(emitter, "elasticsearchClient", elasticsearchClientMock);
// overwrite the default backoff time with 0 seconds to speed up tests
setField(emitter, "BACKOFF_PERIOD", 0);
}
@Override
public IEmitter<Record> getEmitter(final KinesisConnectorConfiguration configuration) {
if (configuration instanceof DynamoDBStreamsConnectorConfiguration) {
return new DynamoDBReplicationEmitter((DynamoDBStreamsConnectorConfiguration) configuration);
} else {
throw new IllegalArgumentException(this + " needs a DynamoDBStreamsConnectorConfiguration argument.");
}
}
@Override
public IBuffer<Record> getBuffer(final KinesisConnectorConfiguration configuration) {
if (configuration instanceof DynamoDBStreamsConnectorConfiguration) {
return new DynamoDBBuffer((DynamoDBStreamsConnectorConfiguration) configuration);
} else {
throw new IllegalArgumentException(this + " needs a DynamoDBStreamsConnectorConfiguration argument.");
}
}
/**
* Creates a new StreamSource.
*
* @param config
* Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider}
* @param inputFile
* File containing record data to emit on each line
* @param loopOverStreamSource
* Loop over the stream source to continually put records
*/
public StreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
this.config = config;
this.inputFile = inputFile;
this.loopOverInputFile = loopOverStreamSource;
this.objectMapper = new ObjectMapper();
kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
KinesisUtils.createInputStream(config);
}
/**
* Creates the Amazon Kinesis stream specified by config.KINESIS_INPUT_STREAM
*
* @param config
* The configuration with the specified input stream name and {@link AWSCredentialsProvider}
* @param shardCount
* The shard count to create the stream with
*/
public static void createInputStream(KinesisConnectorConfiguration config) {
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
createAndWaitForStreamToBecomeAvailable(kinesisClient,
config.KINESIS_INPUT_STREAM,
config.KINESIS_INPUT_STREAM_SHARD_COUNT);
}
/**
* Creates the Amazon Kinesis stream specified by config.KINESIS_OUTPUT_STREAM.
*
* @param config
* The configuration with the specified output stream name and {@link AWSCredentialsProvider}
* @param shardCount
* The shard count to create the stream with
*/
public static void createOutputStream(KinesisConnectorConfiguration config) {
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
createAndWaitForStreamToBecomeAvailable(kinesisClient,
config.KINESIS_OUTPUT_STREAM,
config.KINESIS_OUTPUT_STREAM_SHARD_COUNT);
}
/**
* Deletes the input stream specified by config.KINESIS_INPUT_STREAM
*
* @param config
* The configuration containing the stream name and {@link AWSCredentialsProvider}
*/
public static void deleteInputStream(KinesisConnectorConfiguration config) {
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
deleteStream(kinesisClient, config.KINESIS_INPUT_STREAM);
}
/**
* Deletes the output stream specified by config.KINESIS_OUTPUT_STREAM
*
* @param config
* The configuration containing the stream name and {@link AWSCredentialsProvider}
*/
public static void deleteOutputStream(KinesisConnectorConfiguration config) {
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
deleteStream(kinesisClient, config.KINESIS_OUTPUT_STREAM);
}
public DynamoDBEmitter(KinesisConnectorConfiguration configuration) {
// Amazon DynamoDB Config
this.dynamoDBEndpoint = configuration.DYNAMODB_ENDPOINT;
this.dynamoDBTableName = configuration.DYNAMODB_DATA_TABLE_NAME;
// Client
this.dynamoDBClient = new AmazonDynamoDBClient(configuration.AWS_CREDENTIALS_PROVIDER);
this.dynamoDBClient.setEndpoint(this.dynamoDBEndpoint);
}
public S3ManifestEmitter(KinesisConnectorConfiguration configuration) {
super(configuration);
manifestStream = configuration.KINESIS_OUTPUT_STREAM;
kinesisClient = new AmazonKinesisClient(configuration.AWS_CREDENTIALS_PROVIDER);
if (configuration.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(configuration.KINESIS_ENDPOINT);
}
}
public S3Emitter(KinesisConnectorConfiguration configuration) {
s3Bucket = configuration.S3_BUCKET;
s3Endpoint = configuration.S3_ENDPOINT;
s3client = new AmazonS3Client(configuration.AWS_CREDENTIALS_PROVIDER);
if (s3Endpoint != null) {
s3client.setEndpoint(s3Endpoint);
}
}
public BasicMemoryBuffer(KinesisConnectorConfiguration configuration, List<T> buffer) {
bytesPerFlush = configuration.BUFFER_BYTE_SIZE_LIMIT;
numMessagesToBuffer = configuration.BUFFER_RECORD_COUNT_LIMIT;
millisecondsToBuffer = configuration.BUFFER_MILLISECONDS_LIMIT;
this.buffer = buffer;
byteCount = new AtomicLong();
previousFlushTimeMillisecond = getCurrentTimeMilliseconds();
}
public RedshiftBasicEmitter(KinesisConnectorConfiguration configuration) {
super(configuration);
s3bucket = configuration.S3_BUCKET;
redshiftTable = configuration.REDSHIFT_DATA_TABLE;
redshiftDelimiter = configuration.REDSHIFT_DATA_DELIMITER;
redshiftURL = configuration.REDSHIFT_URL;
loginProperties = new Properties();
loginProperties.setProperty("user", configuration.REDSHIFT_USERNAME);
loginProperties.setProperty("password", configuration.REDSHIFT_PASSWORD);
accessKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSAccessKeyId();
secretKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSSecretKey();
}
@Before
public void setUp() {
control = EasyMock.createControl();
// mock properties
Properties props = new Properties();
// Mock a redentials provider for constructor arg
credsProvider = control.createMock(AWSCredentialsProvider.class);
config = new KinesisConnectorConfiguration(props, credsProvider);
}
@Before
public void setUp() throws Exception {
// Setup the config
Properties props = new Properties();
props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_RECORD_COUNT_LIMIT,
Integer.toString(buffRecCount));
props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_BYTE_SIZE_LIMIT,
Integer.toString(buffByteLim));
props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_MILLISECONDS_LIMIT,
Long.toString(buffTimeMilliLim));
config = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain());
control = EasyMock.createControl();
}
/**
* Creates a new StreamSource.
*
* @param config
* Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider}
* @param inputFile
* File containing record data to emit on each line
* @param loopOverStreamSource
* Loop over the stream source to continually put records
*/
public StreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
this.config = config;
this.inputFile = inputFile;
this.loopOverInputFile = loopOverStreamSource;
this.objectMapper = new ObjectMapper();
kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
KinesisUtils.createInputStream(config);
}
/**
* Creates the Amazon Kinesis stream specified by config.KINESIS_INPUT_STREAM
*
* @param config
* The configuration with the specified input stream name and {@link AWSCredentialsProvider}
* @param shardCount
* The shard count to create the stream with
*/
public static void createInputStream(KinesisConnectorConfiguration config) {
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
createAndWaitForStreamToBecomeAvailable(kinesisClient,
config.KINESIS_INPUT_STREAM,
config.KINESIS_INPUT_STREAM_SHARD_COUNT);
}
/**
* Creates the Amazon Kinesis stream specified by config.KINESIS_OUTPUT_STREAM.
*
* @param config
* The configuration with the specified output stream name and {@link AWSCredentialsProvider}
* @param shardCount
* The shard count to create the stream with
*/
public static void createOutputStream(KinesisConnectorConfiguration config) {
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
createAndWaitForStreamToBecomeAvailable(kinesisClient,
config.KINESIS_OUTPUT_STREAM,
config.KINESIS_OUTPUT_STREAM_SHARD_COUNT);
}
/**
* Deletes the input stream specified by config.KINESIS_INPUT_STREAM
*
* @param config
* The configuration containing the stream name and {@link AWSCredentialsProvider}
*/
public static void deleteInputStream(KinesisConnectorConfiguration config) {
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
deleteStream(kinesisClient, config.KINESIS_INPUT_STREAM);
}
/**
* Deletes the output stream specified by config.KINESIS_OUTPUT_STREAM
*
* @param config
* The configuration containing the stream name and {@link AWSCredentialsProvider}
*/
public static void deleteOutputStream(KinesisConnectorConfiguration config) {
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
if (config.KINESIS_ENDPOINT != null) {
kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
}
deleteStream(kinesisClient, config.KINESIS_OUTPUT_STREAM);
}
@Override
public ITransformerBase<KinesisMessageModel, ElasticsearchObject>
getTransformer(KinesisConnectorConfiguration configuration) {
if (configuration.BATCH_RECORDS_IN_PUT_REQUEST) {
return new BatchedKinesisMessageModelElasticsearchTransformer();
} else {
return new SingleKinesisMessageModelElasticsearchTransformer();
}
}
@Override
public ITransformer<Record, Record> getTransformer(final KinesisConnectorConfiguration configuration) {
return new DynamoDBStreamsRecordTransformer();
}
@Override
public IFilter<Record> getFilter(final KinesisConnectorConfiguration configuration) {
return new AllPassFilter<Record>();
}
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile) {
this(config, inputFile, false);
}
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
super(config, inputFile, loopOverStreamSource);
buffer = new ArrayList<KinesisMessageModel>();
}
@Override
public IEmitter<Map<String,String>> getEmitter(KinesisConnectorConfiguration configuration) {
return new HBaseEmitter((EMRHBaseKinesisConnectorConfiguration) configuration);
}
@Override
public IBuffer<KinesisMessageModel> getBuffer(KinesisConnectorConfiguration configuration) {
return new BasicMemoryBuffer<KinesisMessageModel>(configuration);
}