类com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration源码实例Demo

下面列出了怎么用com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration的API类实例代码及写法,或者点击链接到github查看源代码。

public ElasticsearchEmitter(KinesisConnectorConfiguration configuration) {
    Settings settings =
            ImmutableSettings.settingsBuilder()
                    .put(ELASTICSEARCH_CLUSTER_NAME_KEY, configuration.ELASTICSEARCH_CLUSTER_NAME)
                    .put(ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY, configuration.ELASTICSEARCH_TRANSPORT_SNIFF)
                    .put(ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY,
                            configuration.ELASTICSEARCH_IGNORE_CLUSTER_NAME)
                    .put(ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY, configuration.ELASTICSEARCH_PING_TIMEOUT)
                    .put(ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY,
                            configuration.ELASTICSEARCH_NODE_SAMPLER_INTERVAL)
                    .build();
    elasticsearchEndpoint = configuration.ELASTICSEARCH_ENDPOINT;
    elasticsearchPort = configuration.ELASTICSEARCH_PORT;
    LOG.info("ElasticsearchEmitter using elasticsearch endpoint " + elasticsearchEndpoint + ":" + elasticsearchPort);
    elasticsearchClient = new TransportClient(settings);
    elasticsearchClient.addTransportAddress(new InetSocketTransportAddress(elasticsearchEndpoint, elasticsearchPort));
}
 
public RedshiftManifestEmitter(KinesisConnectorConfiguration configuration) {
    dataTable = configuration.REDSHIFT_DATA_TABLE;
    fileTable = configuration.REDSHIFT_FILE_TABLE;
    fileKeyColumn = configuration.REDSHIFT_FILE_KEY_COLUMN;
    dataDelimiter = configuration.REDSHIFT_DATA_DELIMITER;
    copyMandatory = configuration.REDSHIFT_COPY_MANDATORY;
    s3Bucket = configuration.S3_BUCKET;
    s3Endpoint = configuration.S3_ENDPOINT;
    s3Client = new AmazonS3Client(configuration.AWS_CREDENTIALS_PROVIDER);
    if (s3Endpoint != null) {
        s3Client.setEndpoint(s3Endpoint);
    }
    credentialsProvider = configuration.AWS_CREDENTIALS_PROVIDER;
    loginProps = new Properties();
    loginProps.setProperty("user", configuration.REDSHIFT_USERNAME);
    loginProps.setProperty("password", configuration.REDSHIFT_PASSWORD);
    redshiftURL = configuration.REDSHIFT_URL;
}
 
@Before
@SuppressWarnings("unchecked")
public void setUp() {
    // control object used to create mock dependencies
    control = EasyMock.createControl();

    // mock dependencies
    emitter = control.createMock(IEmitter.class);
    transformer = control.createMock(ITransformer.class);
    buffer = control.createMock(IBuffer.class);
    filter = control.createMock(IFilter.class);
    checkpointer = control.createMock(IRecordProcessorCheckpointer.class);
    // use a real configuration to get actual default values (not anything created by EasyMock)
    configuration = new KinesisConnectorConfiguration(new Properties(), new DefaultAWSCredentialsProviderChain());

}
 
@Before
public void setUp() {
    // object under test
    Properties props = new Properties();
    AWSCredentialsProvider creds = createMock(AWSCredentialsProvider.class);
    configuration = new KinesisConnectorConfiguration(props, creds);

    emitter = new ElasticsearchEmitter(configuration);

    buffer = createMock(UnmodifiableBuffer.class);
    mockBulkBuilder = createMock(BulkRequestBuilder.class);
    mockIndexBuilder = createMock(IndexRequestBuilder.class);
    mockFuture = createMock(ListenableActionFuture.class);
    mockBulkResponse = createMock(BulkResponse.class);

    // overwrite the elasticseach client with a mock
    elasticsearchClientMock = createMock(TransportClient.class);

    setField(emitter, "elasticsearchClient", elasticsearchClientMock);
    // overwrite the default backoff time with 0 seconds to speed up tests
    setField(emitter, "BACKOFF_PERIOD", 0);
}
 
@Override
public IEmitter<Record> getEmitter(final KinesisConnectorConfiguration configuration) {
    if (configuration instanceof DynamoDBStreamsConnectorConfiguration) {
        return new DynamoDBReplicationEmitter((DynamoDBStreamsConnectorConfiguration) configuration);
    } else {
        throw new IllegalArgumentException(this + " needs a DynamoDBStreamsConnectorConfiguration argument.");
    }

}
 
@Override
public IBuffer<Record> getBuffer(final KinesisConnectorConfiguration configuration) {
    if (configuration instanceof DynamoDBStreamsConnectorConfiguration) {
        return new DynamoDBBuffer((DynamoDBStreamsConnectorConfiguration) configuration);
    } else {
        throw new IllegalArgumentException(this + " needs a DynamoDBStreamsConnectorConfiguration argument.");
    }
}
 
源代码7 项目: aws-big-data-blog   文件: StreamSource.java
/**
 * Creates a new StreamSource.
 * 
 * @param config
 *        Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider}
 * @param inputFile
 *        File containing record data to emit on each line
 * @param loopOverStreamSource
 *        Loop over the stream source to continually put records
 */
public StreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
    this.config = config;
    this.inputFile = inputFile;
    this.loopOverInputFile = loopOverStreamSource;
    this.objectMapper = new ObjectMapper();
    kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    KinesisUtils.createInputStream(config);
}
 
源代码8 项目: aws-big-data-blog   文件: KinesisUtils.java
/**
 * Creates the Amazon Kinesis stream specified by config.KINESIS_INPUT_STREAM
 * 
 * @param config
 *        The configuration with the specified input stream name and {@link AWSCredentialsProvider}
 * @param shardCount
 *        The shard count to create the stream with
 */
public static void createInputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    createAndWaitForStreamToBecomeAvailable(kinesisClient,
            config.KINESIS_INPUT_STREAM,
            config.KINESIS_INPUT_STREAM_SHARD_COUNT);
}
 
源代码9 项目: aws-big-data-blog   文件: KinesisUtils.java
/**
 * Creates the Amazon Kinesis stream specified by config.KINESIS_OUTPUT_STREAM.
 * 
 * @param config
 *        The configuration with the specified output stream name and {@link AWSCredentialsProvider}
 * @param shardCount
 *        The shard count to create the stream with
 */
public static void createOutputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    createAndWaitForStreamToBecomeAvailable(kinesisClient,
            config.KINESIS_OUTPUT_STREAM,
            config.KINESIS_OUTPUT_STREAM_SHARD_COUNT);
}
 
源代码10 项目: aws-big-data-blog   文件: KinesisUtils.java
/**
 * Deletes the input stream specified by config.KINESIS_INPUT_STREAM
 * 
 * @param config
 *        The configuration containing the stream name and {@link AWSCredentialsProvider}
 */
public static void deleteInputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    deleteStream(kinesisClient, config.KINESIS_INPUT_STREAM);
}
 
源代码11 项目: aws-big-data-blog   文件: KinesisUtils.java
/**
 * Deletes the output stream specified by config.KINESIS_OUTPUT_STREAM
 * 
 * @param config
 *        The configuration containing the stream name and {@link AWSCredentialsProvider}
 */
public static void deleteOutputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    deleteStream(kinesisClient, config.KINESIS_OUTPUT_STREAM);
}
 
public DynamoDBEmitter(KinesisConnectorConfiguration configuration) {
    // Amazon DynamoDB Config
    this.dynamoDBEndpoint = configuration.DYNAMODB_ENDPOINT;
    this.dynamoDBTableName = configuration.DYNAMODB_DATA_TABLE_NAME;
    // Client
    this.dynamoDBClient = new AmazonDynamoDBClient(configuration.AWS_CREDENTIALS_PROVIDER);
    this.dynamoDBClient.setEndpoint(this.dynamoDBEndpoint);
}
 
public S3ManifestEmitter(KinesisConnectorConfiguration configuration) {
    super(configuration);
    manifestStream = configuration.KINESIS_OUTPUT_STREAM;
    kinesisClient = new AmazonKinesisClient(configuration.AWS_CREDENTIALS_PROVIDER);
    if (configuration.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(configuration.KINESIS_ENDPOINT);
    }
}
 
源代码14 项目: amazon-kinesis-connectors   文件: S3Emitter.java
public S3Emitter(KinesisConnectorConfiguration configuration) {
    s3Bucket = configuration.S3_BUCKET;
    s3Endpoint = configuration.S3_ENDPOINT;
    s3client = new AmazonS3Client(configuration.AWS_CREDENTIALS_PROVIDER);
    if (s3Endpoint != null) {
        s3client.setEndpoint(s3Endpoint);
    }
}
 
public BasicMemoryBuffer(KinesisConnectorConfiguration configuration, List<T> buffer) {
    bytesPerFlush = configuration.BUFFER_BYTE_SIZE_LIMIT;
    numMessagesToBuffer = configuration.BUFFER_RECORD_COUNT_LIMIT;
    millisecondsToBuffer = configuration.BUFFER_MILLISECONDS_LIMIT;
    this.buffer = buffer;
    byteCount = new AtomicLong();
    previousFlushTimeMillisecond = getCurrentTimeMilliseconds();
}
 
public RedshiftBasicEmitter(KinesisConnectorConfiguration configuration) {
    super(configuration);
    s3bucket = configuration.S3_BUCKET;
    redshiftTable = configuration.REDSHIFT_DATA_TABLE;
    redshiftDelimiter = configuration.REDSHIFT_DATA_DELIMITER;
    redshiftURL = configuration.REDSHIFT_URL;
    loginProperties = new Properties();
    loginProperties.setProperty("user", configuration.REDSHIFT_USERNAME);
    loginProperties.setProperty("password", configuration.REDSHIFT_PASSWORD);
    accessKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSAccessKeyId();
    secretKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSSecretKey();
}
 
@Before
public void setUp() {
    control = EasyMock.createControl();
    // mock properties
    Properties props = new Properties();

    // Mock a redentials provider for constructor arg
    credsProvider = control.createMock(AWSCredentialsProvider.class);
    config = new KinesisConnectorConfiguration(props, credsProvider);
}
 
@Before
public void setUp() throws Exception {
    // Setup the config
    Properties props = new Properties();
    props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_RECORD_COUNT_LIMIT,
            Integer.toString(buffRecCount));
    props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_BYTE_SIZE_LIMIT,
            Integer.toString(buffByteLim));
    props.setProperty(KinesisConnectorConfiguration.PROP_BUFFER_MILLISECONDS_LIMIT,
            Long.toString(buffTimeMilliLim));
    config = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain());

    control = EasyMock.createControl();

}
 
源代码19 项目: amazon-kinesis-connectors   文件: StreamSource.java
/**
 * Creates a new StreamSource.
 * 
 * @param config
 *        Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider}
 * @param inputFile
 *        File containing record data to emit on each line
 * @param loopOverStreamSource
 *        Loop over the stream source to continually put records
 */
public StreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
    this.config = config;
    this.inputFile = inputFile;
    this.loopOverInputFile = loopOverStreamSource;
    this.objectMapper = new ObjectMapper();
    kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    KinesisUtils.createInputStream(config);
}
 
源代码20 项目: amazon-kinesis-connectors   文件: KinesisUtils.java
/**
 * Creates the Amazon Kinesis stream specified by config.KINESIS_INPUT_STREAM
 * 
 * @param config
 *        The configuration with the specified input stream name and {@link AWSCredentialsProvider}
 * @param shardCount
 *        The shard count to create the stream with
 */
public static void createInputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    createAndWaitForStreamToBecomeAvailable(kinesisClient,
            config.KINESIS_INPUT_STREAM,
            config.KINESIS_INPUT_STREAM_SHARD_COUNT);
}
 
源代码21 项目: amazon-kinesis-connectors   文件: KinesisUtils.java
/**
 * Creates the Amazon Kinesis stream specified by config.KINESIS_OUTPUT_STREAM.
 * 
 * @param config
 *        The configuration with the specified output stream name and {@link AWSCredentialsProvider}
 * @param shardCount
 *        The shard count to create the stream with
 */
public static void createOutputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    createAndWaitForStreamToBecomeAvailable(kinesisClient,
            config.KINESIS_OUTPUT_STREAM,
            config.KINESIS_OUTPUT_STREAM_SHARD_COUNT);
}
 
源代码22 项目: amazon-kinesis-connectors   文件: KinesisUtils.java
/**
 * Deletes the input stream specified by config.KINESIS_INPUT_STREAM
 * 
 * @param config
 *        The configuration containing the stream name and {@link AWSCredentialsProvider}
 */
public static void deleteInputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    deleteStream(kinesisClient, config.KINESIS_INPUT_STREAM);
}
 
源代码23 项目: amazon-kinesis-connectors   文件: KinesisUtils.java
/**
 * Deletes the output stream specified by config.KINESIS_OUTPUT_STREAM
 * 
 * @param config
 *        The configuration containing the stream name and {@link AWSCredentialsProvider}
 */
public static void deleteOutputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    deleteStream(kinesisClient, config.KINESIS_OUTPUT_STREAM);
}
 
@Override
public ITransformerBase<KinesisMessageModel, ElasticsearchObject>
        getTransformer(KinesisConnectorConfiguration configuration) {
    if (configuration.BATCH_RECORDS_IN_PUT_REQUEST) {
        return new BatchedKinesisMessageModelElasticsearchTransformer();
    } else {
        return new SingleKinesisMessageModelElasticsearchTransformer();
    }
}
 
@Override
public ITransformer<Record, Record> getTransformer(final KinesisConnectorConfiguration configuration) {
    return new DynamoDBStreamsRecordTransformer();
}
 
@Override
public IFilter<Record> getFilter(final KinesisConnectorConfiguration configuration) {
    return new AllPassFilter<Record>();
}
 
源代码27 项目: aws-big-data-blog   文件: BatchedStreamSource.java
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile) {
    this(config, inputFile, false);
}
 
源代码28 项目: aws-big-data-blog   文件: BatchedStreamSource.java
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
    super(config, inputFile, loopOverStreamSource);
    buffer = new ArrayList<KinesisMessageModel>();
}
 
源代码29 项目: aws-big-data-blog   文件: HBasePipeline.java
@Override
public IEmitter<Map<String,String>> getEmitter(KinesisConnectorConfiguration configuration) {
    return new HBaseEmitter((EMRHBaseKinesisConnectorConfiguration) configuration);
}
 
源代码30 项目: aws-big-data-blog   文件: HBasePipeline.java
@Override
public IBuffer<KinesisMessageModel> getBuffer(KinesisConnectorConfiguration configuration) {
    return new BasicMemoryBuffer<KinesisMessageModel>(configuration);
}
 
 类所在包
 类方法
 同包方法