类org.apache.kafka.common.config.ConfigDef.Type源码实例Demo

下面列出了怎么用org.apache.kafka.common.config.ConfigDef.Type的API类实例代码及写法,或者点击链接到github查看源代码。

public static ConfigDef conf() {
  return new ConfigDef()
      .define(DB_NAME_ALIAS, Type.STRING, Importance.HIGH, "Db Name Alias")
      .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "Topic")
      .define(DB_NAME_CONFIG, Type.STRING, Importance.HIGH, "Db Name")
      .define(DB_HOST_NAME_CONFIG,Type.STRING,Importance.HIGH,"Db HostName")
      .define(DB_PORT_CONFIG,Type.INT,Importance.HIGH,"Db Port")
      .define(DB_USER_CONFIG,Type.STRING,Importance.HIGH,"Db User")
      .define(DB_USER_PASSWORD_CONFIG,Type.STRING,Importance.HIGH,"Db User Password")
      .define(TABLE_WHITELIST,Type.STRING,Importance.HIGH,"TAbles will be mined")
      .define(PARSE_DML_DATA,Type.BOOLEAN,Importance.HIGH,"Parse DML Data")
      .define(DB_FETCH_SIZE,Type.INT,Importance.HIGH,"Database Record Fetch Size")
      .define(RESET_OFFSET,Type.BOOLEAN,Importance.HIGH,"Reset Offset")
      .define(START_SCN,Type.STRING,"",Importance.LOW,"Start SCN")
      .define(MULTITENANT, Type.BOOLEAN, Importance.HIGH, "Database is multitenant (container)")
      .define(TABLE_BLACKLIST, Type.STRING, Importance.LOW, "Table will not be mined");
}
 
public static ConfigDef conf() {
    int order = 0;
    return new ConfigDef()
            .define(
                    FS_URIS,
                    Type.LIST,
                    ConfigDef.NO_DEFAULT_VALUE,
                    Importance.HIGH,
                    FS_URIS_DOC,
                    CONNECTOR_GROUP,
                    ++order,
                    ConfigDef.Width.LONG,
                    FS_URIS_DISPLAY
            ).define(
                    TOPIC,
                    Type.STRING,
                    ConfigDef.NO_DEFAULT_VALUE,
                    Importance.HIGH,
                    TOPIC_DOC,
                    CONNECTOR_GROUP,
                    ++order,
                    ConfigDef.Width.LONG,
                    TOPIC_DISPLAY
            );
}
 
private static void addModeOptions(ConfigDef config) {
    int orderInGroup = 0;
    config.define(
            MODE_CONFIG,
            Type.STRING,
            MODE_UNSPECIFIED,
            ConfigDef.ValidString.in(
                    MODE_UNSPECIFIED,
                    MODE_BULK,
                    MODE_TIMESTAMP,
                    MODE_INCREMENTING,
                    MODE_TIMESTAMP_INCREMENTING
            ),
            Importance.HIGH,
            MODE_DOC,
            MODE_GROUP,
            ++orderInGroup,
            Width.MEDIUM,
            MODE_DISPLAY,
            Collections.singletonList(
                    INCREMENTING_FIELD_NAME_CONFIG
            )
    ).define(
            INCREMENTING_FIELD_NAME_CONFIG,
            Type.STRING,
            INCREMENTING_FIELD_NAME_DEFAULT,
            Importance.MEDIUM,
            INCREMENTING_FIELD_NAME_DOC,
            MODE_GROUP,
            ++orderInGroup,
            Width.MEDIUM,
            INCREMENTING_FIELD_NAME_DISPLAY
    );
}
 
private static void addConnectorOptions(ConfigDef config) {
    int orderInGroup = 0;
    config.define(
            POLL_INTERVAL_MS_CONFIG,
            Type.STRING,
            POLL_INTERVAL_MS_DEFAULT,
            Importance.HIGH,
            POLL_INTERVAL_MS_DOC,
            CONNECTOR_GROUP,
            ++orderInGroup,
            Width.SHORT,
            POLL_INTERVAL_MS_DISPLAY
    ).define(
            BATCH_MAX_ROWS_CONFIG,
            Type.STRING,
            BATCH_MAX_ROWS_DEFAULT,
            Importance.LOW,
            BATCH_MAX_ROWS_DOC,
            CONNECTOR_GROUP,
            ++orderInGroup,
            Width.SHORT,
            BATCH_MAX_ROWS_DISPLAY
    ).define(
            TOPIC_PREFIX_CONFIG,
            Type.STRING,
            Importance.HIGH,
            TOPIC_PREFIX_DOC,
            CONNECTOR_GROUP,
            ++orderInGroup,
            Width.MEDIUM,
            TOPIC_PREFIX_DISPLAY
    );
}
 
public static ConfigDef conf() {
  String group = "REST_HTTP";
  int orderInGroup = 0;
  return new ConfigDef()
    .define(TOPIC_LIST_CONFIG,
      Type.LIST,
      NO_DEFAULT_VALUE,
      Importance.HIGH,
      TOPIC_LIST_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      TOPIC_LIST_DISPLAY)
    ;
}
 
public static ConfigDef conf(Map<String, ?> unparsedConfig) {
  String group = "REST_HTTP";
  int orderInGroup = 0;
  ConfigDef config = new ConfigDef()
    .define(RESPONSE_VAR_NAMES_CONFIG,
      Type.LIST,
      RESPONSE_VAR_NAMES_DEFAULT,
      Importance.LOW,
      RESPONSE_VAR_NAMES_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      RESPONSE_VAR_NAMES_DISPLAY)
    ;

  // This is a bit hacky and there may be a better way of doing it, but I don't know it.
  // We need to create config items dynamically, based on the parameter names,
  // so we need a 2 pass parse of the config.
  List<String> varNames = (List) config.parse(unparsedConfig).get(RESPONSE_VAR_NAMES_CONFIG);

  for(String varName : varNames) {
    config.define(String.format(RESPONSE_VAR_XPATH_CONFIG, varName),
      Type.STRING,
      RESPONSE_VAR_XPATH_DEFAULT,
      Importance.HIGH,
      String.format(RESPONSE_VAR_XPATH_DOC, varName),
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      String.format(RESPONSE_VAR_XPATH_DISPLAY, varName));
  }

  return(config);
}
 
public static ConfigDef conf(Map<String, ?> unparsedConfig) {
  String group = "REST_HTTP";
  int orderInGroup = 0;
  ConfigDef config = new ConfigDef()
    .define(RESPONSE_VAR_NAMES_CONFIG,
      Type.LIST,
      RESPONSE_VAR_NAMES_DEFAULT,
      Importance.LOW,
      RESPONSE_VAR_NAMES_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      RESPONSE_VAR_NAMES_DISPLAY)
    ;

  // This is a bit hacky and there may be a better way of doing it, but I don't know it.
  // We need to create config items dynamically, based on the parameter names,
  // so we need a 2 pass parse of the config.
  List<String> varNames = (List) config.parse(unparsedConfig).get(RESPONSE_VAR_NAMES_CONFIG);

  for(String varName : varNames) {
    config.define(String.format(RESPONSE_VAR_REGEX_CONFIG, varName),
      Type.STRING,
      RESPONSE_VAR_REGEX_DEFAULT,
      Importance.HIGH,
      String.format(RESPONSE_VAR_REGEX_DOC, varName),
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      String.format(RESPONSE_VAR_REGEX_DISPLAY, varName));
  }

  return(config);
}
 
public static void defineClientGroup(final ConfigDef definitions) {
  int order = 0;

  definitions
      .define(
          BROKER_CONTACTPOINT_CONFIG,
          Type.STRING,
          BROKER_CONTACTPOINT_DEFAULT,
          Importance.HIGH,
          BROKER_CONTACTPOINT_DOC,
          CLIENT_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Broker contact point")
      .define(
          REQUEST_TIMEOUT_CONFIG,
          Type.LONG,
          REQUEST_TIMEOUT_DEFAULT,
          Importance.LOW,
          REQUEST_TIMEOUT_DOC,
          CLIENT_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Request timeout")
      .define(
          USE_PLAINTEXT_CONFIG,
          Type.BOOLEAN,
          USE_PLAINTEXT_DEFAULT,
          Importance.LOW,
          USE_PLAINTEXT_DOC,
          CLIENT_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Use plaintext connection");
}
 
public static ConfigDef conf() {
    return new ConfigDef()
            .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC)
            .define(OWNER_CONFIG, Type.STRING, Importance.HIGH, OWNER_DOC)
            .define(REPO_CONFIG, Type.STRING, Importance.HIGH, REPO_DOC)
            .define(BATCH_SIZE_CONFIG, Type.INT, 100, new BatchSizeValidator(), Importance.LOW, BATCH_SIZE_DOC)
            .define(SINCE_CONFIG, Type.STRING, ZonedDateTime.now().minusYears(1).toInstant().toString(),
                    new TimestampValidator(), Importance.HIGH, SINCE_DOC)
            .define(AUTH_USERNAME_CONFIG, Type.STRING, "", Importance.HIGH, AUTH_USERNAME_DOC)
            .define(AUTH_PASSWORD_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_PASSWORD_DOC);
}
 
public static ConfigDef conf() {
  return new ConfigDef()
      .define(TWITTER_DEBUG_CONF, Type.BOOLEAN, false, Importance.LOW, TWITTER_DEBUG_DOC)
      .define(TWITTER_OAUTH_CONSUMER_KEY_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_CONSUMER_KEY_DOC)
      .define(TWITTER_OAUTH_SECRET_KEY_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_SECRET_KEY_DOC)
      .define(TWITTER_OAUTH_ACCESS_TOKEN_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_ACCESS_TOKEN_DOC)
      .define(TWITTER_OAUTH_ACCESS_TOKEN_SECRET_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_ACCESS_TOKEN_SECRET_DOC)
      .define(FILTER_KEYWORDS_CONF, Type.LIST, Importance.HIGH, FILTER_KEYWORDS_DOC)
      .define(
          ConfigKeyBuilder.of(FILTER_USER_IDS_CONF, Type.LIST)
              .importance(Importance.HIGH)
              .documentation(FILTER_USER_IDS_DOC)
              .defaultValue(Collections.emptyList())
              .validator(USERID_VALIDATOR)
              .build()
      )
      .define(KAFKA_STATUS_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_STATUS_TOPIC_DOC)
      .define(PROCESS_DELETES_CONF, Type.BOOLEAN, Importance.HIGH, PROCESS_DELETES_DOC)
      .define(
          ConfigKeyBuilder.of(QUEUE_EMPTY_MS_CONF, Type.INT)
              .importance(Importance.LOW)
              .documentation(QUEUE_EMPTY_MS_DOC)
              .defaultValue(100)
              .validator(ConfigDef.Range.atLeast(10))
              .build()
      )
      .define(
          ConfigKeyBuilder.of(QUEUE_BATCH_SIZE_CONF, Type.INT)
              .importance(Importance.LOW)
              .documentation(QUEUE_BATCH_SIZE_DOC)
              .defaultValue(100)
              .validator(ConfigDef.Range.atLeast(1))
              .build()
      );
}
 
/**
 * Create default mConfig.
 * @return default mConfig
 */
public static ConfigDef baseConfigDef() {
    return new ConfigDef()
            .define(MqttSourceConstant.KAFKA_TOPIC, Type.STRING, "mqtt", Importance.LOW,
                    "Kafka topic to put received data \n Depends on message processor")
            .define(MqttSourceConstant.MQTT_CLIENT_ID, Type.STRING, null, Importance.MEDIUM,
                    "mqtt client id to use don't set to use random")
            .define(MqttSourceConstant.MQTT_CLEAN_SESSION, Type.BOOLEAN, true, Importance.HIGH,
                    "use clean session in connection?")
            .define(MqttSourceConstant.MQTT_CONNECTION_TIMEOUT, Type.INT, 30, Importance.LOW,
                    "connection timeout to use")
            .define(MqttSourceConstant.MQTT_KEEP_ALIVE_INTERVAL, Type.INT, 60, Importance.LOW,
                    "keepalive interval to use")
            .define(MqttSourceConstant.MQTT_SERVER_URIS, Type.STRING,
                    "tcp://localhost:1883", Importance.HIGH,
                    "mqtt server to connect to")
            .define(MqttSourceConstant.MQTT_TOPIC, Type.STRING, "#", Importance.HIGH,
                    "mqtt server to connect to")
            .define(MqttSourceConstant.MQTT_QUALITY_OF_SERVICE, Type.INT, 1, Importance.LOW,
                    "mqtt qos to use")
            .define(MqttSourceConstant.MQTT_SSL_CA_CERT, Type.STRING, null, Importance.LOW,
                    "CA cert file to use if using ssl",
                    "SSL", 1, ConfigDef.Width.LONG, "CA cert", MODE_SSL_RECOMMENDER)
            .define(MqttSourceConstant.MQTT_SSL_CERT, Type.STRING, null, Importance.LOW,
                    "cert file to use if using ssl",
                    "SSL", 2, ConfigDef.Width.LONG, "Cert", MODE_SSL_RECOMMENDER)
            .define(MqttSourceConstant.MQTT_SSL_PRIV_KEY, Type.STRING, null, Importance.LOW,
                    "cert priv key to use if using ssl",
                    "SSL", 3, ConfigDef.Width.LONG, "Key", MODE_SSL_RECOMMENDER)
            .define(MqttSourceConstant.MQTT_USERNAME, Type.STRING, null, Importance.MEDIUM,
                    "username to authenticate to mqtt broker")
            .define(MqttSourceConstant.MQTT_PASSWORD, Type.STRING, null, Importance.MEDIUM,
                    "password to authenticate to mqtt broker")
            .define(MqttSourceConstant.MESSAGE_PROCESSOR, Type.CLASS,
                    DumbProcessor.class, Importance.HIGH,
                    "message processor to use");
}
 
public static ConfigDef conf() {
  return new ConfigDef()
      .define(PORT_CONF, Type.INT, 8088, Importance.HIGH, PORT_DOC)
      .define(SSL_RENEGOTIATION_ALLOWED_CONF, Type.BOOLEAN, true, Importance.LOW, SSL_RENEGOTIATION_ALLOWED_DOC)
      .define(KEYSTORE_PATH_CONF, Type.STRING, Importance.HIGH, KEYSTORE_PATH_DOC)
      .define(KEYSTORE_PASSWORD_CONF, Type.PASSWORD, Importance.HIGH, KEYSTORE_PASSWORD_DOC)
      .define(EVENT_COLLECTOR_URL_CONF, Type.STRING, "/services/collector/event", Importance.LOW, EVENT_COLLECTOR_URL_DOC)
      .define(EVENT_COLLECTOR_INDEX_ALLOWED_CONF, Type.LIST, new ArrayList<>(), Importance.LOW, EVENT_COLLECTOR_INDEX_ALLOWED_DOC)
      .define(EVENT_COLLECTOR_INDEX_DEFAULT_CONF, Type.STRING, Importance.HIGH, EVENT_COLLECTOR_INDEX_DEFAULT_DOC)
      .define(TOPIC_PER_INDEX_CONF, Type.BOOLEAN, false, Importance.MEDIUM, TOPIC_PER_INDEX_DOC)
      .define(TOPIC_PREFIX_CONF, Type.STRING, Importance.HIGH, TOPIC_PREFIX_DOC)
      .define(BATCH_SIZE_CONF, Type.INT, 10000, Importance.LOW, BATCH_SIZE_DOC)
      .define(BACKOFF_MS_CONF, Type.INT, 100, Importance.LOW, BACKOFF_MS_DOC);
}
 
public static ConfigDef conf() {
  return new ConfigDef()
      .define(AUTHORIZATION_TOKEN_CONF, Type.PASSWORD, Importance.HIGH, AUTHORIZATION_TOKEN_DOC)
      .define(REMOTE_HOST_CONF, Type.STRING, Importance.HIGH, REMOTE_HOST_DOC)
      .define(REMOTE_PORT_CONF, Type.INT, 8088, Importance.MEDIUM, REMOTE_PORT_DOC)
      .define(SSL_CONF, Type.BOOLEAN, true, Importance.HIGH, SSL_DOC)
      .define(SSL_VALIDATE_CERTIFICATES_CONF, Type.BOOLEAN, true, Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
      .define(SSL_TRUSTSTORE_PATH_CONF, Type.STRING, "", Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
      .define(SSL_TRUSTSTORE_PASSWORD_CONF, Type.PASSWORD, "", Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
      .define(CONNECT_TIMEOUT_CONF, Type.INT, 20000, Importance.LOW, CONNECT_TIMEOUT_DOC)
      .define(READ_TIMEOUT_CONF, Type.INT, 30000, Importance.LOW, READ_TIMEOUT_DOC)
      .define(CURL_LOGGING_ENABLED_CONF, Type.BOOLEAN, false, Importance.LOW, CURL_LOGGING_ENABLED_DOC);
}
 
public static ConfigDef config() {
  return new ConfigDef()
      .define(
          ConfigKeyBuilder.of(MY_SETTING_CONFIG, Type.STRING)
              .documentation(MY_SETTING_DOC)
              .importance(Importance.HIGH)
              .build()
      );
}
 
public static ConfigDef config() {
  return new ConfigDef()
      .define(
          ConfigKeyBuilder.of(MY_SETTING_CONFIG, Type.STRING)
              .documentation(MY_SETTING_DOC)
              .importance(Importance.HIGH)
              .build()
      );
}
 
public static ConfigDef config() {
  return new ConfigDef()
      .define(
          ConfigKeyBuilder.of(MY_SETTING_CONFIG, Type.STRING)
              .documentation(MY_SETTING_DOC)
              .importance(Importance.HIGH)
              .build()
      );
}
 
源代码17 项目: rest-utils   文件: RestConfig.java
public static ConfigDef baseConfigDef(
    int port,
    String listeners,
    String responseMediatypePreferred,
    String responseMediatypeDefault,
    String metricsJmxPrefix
) {
  return incompleteBaseConfigDef()
      .define(
          PORT_CONFIG,
          Type.INT,
          port,
          Importance.LOW,
          PORT_CONFIG_DOC
      ).define(
          LISTENERS_CONFIG,
          Type.LIST,
          listeners,
          Importance.HIGH,
          LISTENERS_DOC
      ).define(
          RESPONSE_MEDIATYPE_PREFERRED_CONFIG,
          Type.LIST,
          responseMediatypePreferred,
          Importance.LOW,
          RESPONSE_MEDIATYPE_PREFERRED_CONFIG_DOC
      ).define(
          RESPONSE_MEDIATYPE_DEFAULT_CONFIG,
          Type.STRING,
          responseMediatypeDefault,
          Importance.LOW,
          RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC
      ).define(
          METRICS_JMX_PREFIX_CONFIG,
          Type.STRING,
          metricsJmxPrefix,
          Importance.LOW,
          METRICS_JMX_PREFIX_DOC
      );
}
 
源代码18 项目: mongo-kafka   文件: MongoSinkConfig.java
private static ConfigDef createConfigDef() {
  ConfigDef configDef =
      new ConfigDef() {

        @Override
        @SuppressWarnings("unchecked")
        public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
          Map<String, ConfigValue> results = super.validateAll(props);
          // Don't validate child configs if the top level configs are broken
          if (results.values().stream().anyMatch((c) -> !c.errorMessages().isEmpty())) {
            return results;
          }

          boolean hasTopicsConfig = !props.getOrDefault(TOPICS_CONFIG, "").trim().isEmpty();
          boolean hasTopicsRegexConfig =
              !props.getOrDefault(TOPICS_REGEX_CONFIG, "").trim().isEmpty();

          if (hasTopicsConfig && hasTopicsRegexConfig) {
            results
                .get(TOPICS_CONFIG)
                .addErrorMessage(
                    format(
                        "%s and %s are mutually exclusive options, but both are set.",
                        TOPICS_CONFIG, TOPICS_REGEX_CONFIG));
          } else if (!hasTopicsConfig && !hasTopicsRegexConfig) {
            results
                .get(TOPICS_CONFIG)
                .addErrorMessage(
                    format("Must configure one of %s or %s", TOPICS_CONFIG, TOPICS_REGEX_CONFIG));
          }

          if (hasTopicsConfig) {
            List<String> topics = (List<String>) results.get(TOPICS_CONFIG).value();
            topics.forEach(
                topic -> results.putAll(MongoSinkTopicConfig.validateAll(topic, props)));
          } else if (hasTopicsRegexConfig) {
            results.putAll(MongoSinkTopicConfig.validateRegexAll(props));
          }
          return results;
        }
      };
  String group = "Connection";
  int orderInGroup = 0;
  configDef.define(
      TOPICS_CONFIG,
      Type.LIST,
      TOPICS_DEFAULT,
      Importance.HIGH,
      TOPICS_DOC,
      group,
      ++orderInGroup,
      Width.MEDIUM,
      TOPICS_DISPLAY);

  configDef.define(
      TOPICS_REGEX_CONFIG,
      Type.STRING,
      TOPICS_REGEX_DEFAULT,
      Validators.isAValidRegex(),
      Importance.HIGH,
      TOPICS_REGEX_DOC,
      group,
      ++orderInGroup,
      Width.MEDIUM,
      TOPICS_REGEX_DISPLAY);

  configDef.define(
      CONNECTION_URI_CONFIG,
      Type.STRING,
      CONNECTION_URI_DEFAULT,
      errorCheckingValueValidator("A valid connection string", ConnectionString::new),
      Importance.HIGH,
      CONNECTION_URI_DOC,
      group,
      ++orderInGroup,
      Width.MEDIUM,
      CONNECTION_URI_DISPLAY);

  group = "Overrides";
  orderInGroup = 0;
  configDef.define(
      TOPIC_OVERRIDE_CONFIG,
      Type.STRING,
      TOPIC_OVERRIDE_DEFAULT,
      Validators.topicOverrideValidator(),
      Importance.LOW,
      TOPIC_OVERRIDE_DOC,
      group,
      ++orderInGroup,
      Width.MEDIUM,
      TOPIC_OVERRIDE_DISPLAY);

  MongoSinkTopicConfig.BASE_CONFIG.configKeys().values().forEach(configDef::define);
  return configDef;
}
 
private static void addDatabaseOptions(ConfigDef config) {
    int orderInGroup = 0;
    config.define(
            ES_HOST_CONF,
            Type.STRING,
            Importance.HIGH,
            ES_HOST_DOC,
            DATABASE_GROUP,
            ++orderInGroup,
            Width.LONG,
            ES_HOST_DISPLAY,
            Collections.singletonList(INDEX_PREFIX_CONFIG)
    ).define(
            ES_PORT_CONF,
            Type.STRING,
            Importance.HIGH,
            ES_PORT_DOC,
            DATABASE_GROUP,
            ++orderInGroup,
            Width.LONG,
            ES_PORT_DISPLAY,
            Collections.singletonList(INDEX_PREFIX_CONFIG)
    ).define(
            ES_USER_CONF,
            Type.STRING,
            null,
            Importance.HIGH,
            ES_USER_DOC,
            DATABASE_GROUP,
            ++orderInGroup,
            Width.LONG,
            ES_USER_DISPLAY
    ).define(
            ES_PWD_CONF,
            Type.STRING,
            null,
            Importance.HIGH,
            ES_PWD_DOC,
            DATABASE_GROUP,
            ++orderInGroup,
            Width.SHORT,
            ES_PWD_DISPLAY
    ).define(
            CONNECTION_ATTEMPTS_CONFIG,
            Type.STRING,
            CONNECTION_ATTEMPTS_DEFAULT,
            Importance.LOW,
            CONNECTION_ATTEMPTS_DOC,
            DATABASE_GROUP,
            ++orderInGroup,
            ConfigDef.Width.SHORT,
            CONNECTION_ATTEMPTS_DISPLAY
    ).define(
            CONNECTION_BACKOFF_CONFIG,
            Type.STRING,
            CONNECTION_BACKOFF_DEFAULT,
            Importance.LOW,
            CONNECTION_BACKOFF_DOC,
            DATABASE_GROUP,
            ++orderInGroup,
            Width.SHORT,
            CONNECTION_BACKOFF_DISPLAY
    ).define(
            INDEX_PREFIX_CONFIG,
            Type.STRING,
            INDEX_PREFIX_DEFAULT,
            Importance.MEDIUM,
            INDEX_PREFIX_DOC,
            DATABASE_GROUP,
            ++orderInGroup,
            Width.LONG,
            INDEX_PREFIX_DISPLAY
    );
}
 
public static ConfigDef conf() {
  String group = "AWS";
  int orderInGroup = 0;
  return new ConfigDef()
    .define(REGION_CONFIG,
      Type.STRING,
      NO_DEFAULT_VALUE,
      new RegionValidator(),
      Importance.HIGH,
      REGION_DOC_CONFIG,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      REGION_DISPLAY_CONFIG,
      new RegionRecommender())

    .define(CREDENTIALS_PROVIDER_CLASS_CONFIG,
      Type.CLASS,
      CREDENTIALS_PROVIDER_CLASS_DEFAULT,
      new CredentialsProviderValidator(),
      Importance.HIGH,
      CREDENTIALS_PROVIDER_DOC_CONFIG,
      group,
      ++orderInGroup,
      ConfigDef.Width.MEDIUM,
      CREDENTIALS_PROVIDER_DISPLAY_CONFIG)

    .define(FUNCTION_NAME_CONFIG,
      Type.STRING,
      NO_DEFAULT_VALUE,
      Importance.HIGH,
      FUNCTION_NAME_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      FUNCTION_NAME_DISPLAY)

    .define(RETRY_BACKOFF_CONFIG,
      Type.LONG,
      RETRY_BACKOFF_DEFAULT,
      Importance.LOW,
      RETRY_BACKOFF_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      RETRY_BACKOFF_DISPLAY)

    .define(INVOCATION_TYPE_CONFIG,
      Type.STRING,
      INVOCATION_TYPE_DEFAULT,
      new InvocationTypeValidator(),
      Importance.LOW,
      INVOCATION_TYPE_DOC_CONFIG,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      INVOCATION_TYPE_DISPLAY_CONFIG,
      new InvocationTypeRecommender())

    .define(PAYLOAD_CONVERTER_CONFIG,
      Type.CLASS,
      PAYLOAD_CONVERTER_DEFAULT,
      new PayloadConverterValidator(),
      Importance.LOW,
      PAYLOAD_CONVERTER_DOC_CONFIG,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      PAYLOAD_CONVERTER_DISPLAY_CONFIG,
      new PayloadConverterRecommender())
    ;
}
 
public static ConfigDef conf(Map<String, ?> unparsedConfig) {
  String group = "REST_HTTP";
  int orderInGroup = 0;
  ConfigDef config =  new ConfigDef()
    .define(REQUEST_BODY_CONFIG,
      Type.STRING,
      REQUEST_BODY_DEFAULT,
      Importance.LOW,
      REQUEST_BODY_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.LONG,
      REQUEST_BODY_DISPLAY)

    .define(REQUEST_PARAMETER_NAMES_CONFIG,
      Type.LIST,
      REQUEST_PARAMETER_NAMES_DEFAULT,
      Importance.HIGH,
      REQUEST_PARAMETER_NAMES_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      REQUEST_PARAMETER_NAMES_DISPLAY)

    .define(REQUEST_HEADERS_CONFIG,
      Type.LIST,
      REQUEST_HEADERS_DEFAULT,
      Importance.HIGH,
      REQUEST_HEADERS_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      REQUEST_HEADERS_DISPLAY)
    ;

  // This is a bit hacky and there may be a better way of doing it, but I don't know it.
  // We need to create config items dynamically, based on the parameter names,
  // so we need a 2 pass parse of the config.
  List<String> paramNames = (List) config.parse(unparsedConfig).get(REQUEST_PARAMETER_NAMES_CONFIG);

  for(String paramName : paramNames) {
    config.define(String.format(REQUEST_PARAMETER_VALUE_CONFIG, paramName),
      Type.STRING,
      REQUEST_PARAMETER_VALUE_DEFAULT,
      Importance.HIGH,
      String.format(REQUEST_PARAMETER_VALUE_DOC, paramName),
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      String.format(REQUEST_PARAMETER_VALUE_DISPLAY, paramName));
  }

  return(config);
}
 
public static ConfigDef conf(Map<String, ?> unparsedConfig) {
  String group = "REST_HTTP";
  int orderInGroup = 0;
  ConfigDef config = new ConfigDef()
    .define(VALUE_PROVIDER_CONFIG,
      Type.CLASS,
      VALUE_PROVIDER_DEFAULT,
      new InstanceOfValidator(ValueProvider.class),
      Importance.HIGH,
      VALUE_PROVIDER_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      VALUE_PROVIDER_DISPLAY,
      new ServiceProviderInterfaceRecommender(ValueProvider.class))

    .define(TEMPLATE_ENGINE_CONFIG,
      Type.CLASS,
      TEMPLATE_ENGINE_DEFAULT,
      new InstanceOfValidator(TemplateEngine.class),
      Importance.HIGH,
      TEMPLATE_ENGINE_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      TEMPLATE_ENGINE_DISPLAY,
      new ServiceProviderInterfaceRecommender(TemplateEngine.class))

    .define(REQUEST_BODY_TEMPLATE_CONFIG,
      Type.STRING,
      REQUEST_BODY_TEMPLATE_DEFAULT,
      Importance.LOW,
      REQUEST_BODY_TEMPLATE_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.LONG,
      REQUEST_BODY_TEMPLATE_DISPLAY)

    .define(REQUEST_PARAMETER_NAMES_CONFIG,
      Type.LIST,
      REQUEST_PARAMETER_NAMES_DEFAULT,
      Importance.LOW,
      REQUEST_PARAMETER_NAMES_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      REQUEST_PARAMETER_NAMES_DISPLAY)

    .define(REQUEST_HEADERS_TEMPLATE_CONFIG,
      Type.LIST,
      REQUEST_HEADERS_TEMPLATE_DEFAULT,
      Importance.LOW,
      REQUEST_HEADERS_TEMPLATE_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      REQUEST_HEADERS_TEMPLATE_DISPLAY)
    ;

  // This is a bit hacky and there may be a better way of doing it, but I don't know it.
  // We need to create config items dynamically, based on the parameter names,
  // so we need a 2 pass parse of the config.
  List<String> paramNames = (List) config.parse(unparsedConfig).get(REQUEST_PARAMETER_NAMES_CONFIG);

  for(String paramName : paramNames) {
    config.define(String.format(REQUEST_PARAMETER_TEMPLATE_CONFIG, paramName),
      Type.STRING,
      REQUEST_PARAMETER_TEMPLATE_DEFAULT,
      Importance.HIGH,
      String.format(REQUEST_PARAMETER_TEMPLATE_DOC, paramName),
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      String.format(REQUEST_PARAMETER_TEMPLATE_DISPLAY, paramName));
  }

  return(config);
}
 
public static ConfigDef conf() {
  String group = "REST_HTTP";
  int orderInGroup = 0;
  return new ConfigDef()
    .define(HTTP_CONNECTION_TIMEOUT_CONFIG,
      Type.LONG,
      HTTP_CONNECTION_TIMEOUT_DEFAULT,
      Importance.LOW,
      HTTP_CONNECTION_TIMEOUT_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      HTTP_CONNECTION_TIMEOUT_DISPLAY)

    .define(HTTP_READ_TIMEOUT_CONFIG,
      Type.LONG,
      HTTP_READ_TIMEOUT_DEFAULT,
      Importance.LOW,
      HTTP_READ_TIMEOUT_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      HTTP_READ_TIMEOUT_DISPLAY)

    .define(HTTP_KEEP_ALIVE_DURATION_CONFIG,
      Type.LONG,
      HTTP_KEEP_ALIVE_DURATION_DEFAULT,
      Importance.LOW,
      HTTP_KEEP_ALIVE_DURATION_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      HTTP_KEEP_ALIVE_DURATION_DISPLAY)

    .define(HTTP_MAX_IDLE_CONNECTION_CONFIG,
      Type.INT,
      HTTP_MAX_IDLE_CONNECTION_DEFAULT,
      Importance.LOW,
      HTTP_MAX_IDLE_CONNECTION_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      HTTP_MAX_IDLE_CONNECTION_DISPLAY)
    ;
}
 
public static ConfigDef conf() {
  String group = "REST";
  int orderInGroup = 0;
  return new ConfigDef()
    .define(SINK_METHOD_CONFIG,
      Type.STRING,
      SINK_METHOD_DEFAULT,
      new MethodValidator(),
      Importance.HIGH,
      SINK_METHOD_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      SINK_METHOD_DISPLAY,
      new MethodRecommender())

    .define(SINK_HEADERS_LIST_CONFIG,
      Type.LIST,
      Collections.EMPTY_LIST,
      Importance.HIGH,
      SINK_HEADERS_LIST_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      SINK_HEADERS_LIST_DISPLAY)

    .define(SINK_URL_CONFIG,
      Type.STRING,
      NO_DEFAULT_VALUE,
      Importance.HIGH,
      SINK_URL_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      SINK_URL_DISPLAY)

    .define(SINK_RETRY_BACKOFF_CONFIG,
      Type.LONG,
      SINK_RETRY_BACKOFF_DEFAULT,
      Importance.LOW,
      SINK_RETRY_BACKOFF_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      SINK_RETRY_BACKOFF_DISPLAY)

    .define(SINK_HTTP_MAX_RETRIES_CONFIG,
      Type.INT,
      SINK_HTTP_MAX_RETRIES_DEFAULT,
      Importance.LOW,
      SINK_HTTP_MAX_RETRIES_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      SINK_HTTP_MAX_RETRIES_DISPLAY)

    .define(SINK_HTTP_CODES_WHITELIST_CONFIG,
      Type.STRING,
      SINK_HTTP_CODES_WHITELIST_DEFAULT,
      Importance.LOW,
      SINK_HTTP_CODES_WHITELIST_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      SINK_HTTP_CODES_WHITELIST_DISPLAY)

    .define(SINK_HTTP_CODES_BLACKLIST_CONFIG,
      Type.STRING,
      SINK_HTTP_CODES_BLACKLIST_DEFAULT,
      Importance.LOW,
      SINK_HTTP_CODES_BLACKLIST_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      SINK_HTTP_CODES_BLACKLIST_DISPLAY)

    .define(SINK_REQUEST_EXECUTOR_CONFIG,
      Type.CLASS,
      SINK_REQUEST_EXECUTOR_DEFAULT,
      Importance.LOW,
      SINK_REQUEST_EXECUTOR_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      SINK_REQUEST_EXECUTOR_DISPLAY)

    .define(SINK_DATE_FORMAT_CONFIG,
      Type.STRING,
      SINK_DATE_FORMAT_DEFAULT,
      Importance.LOW,
      SINK_DATE_FORMAT_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      SINK_DATE_FORMAT_DISPLAY)
    ;
}
 
public static ConfigDef conf() {
  String group = "REST";
  int orderInGroup = 0;
  return new ConfigDef()
    .define(SOURCE_POLL_INTERVAL_CONFIG,
      Type.LONG,
      SOURCE_POLL_INTERVAL_DEFAULT,
      Importance.LOW,
      SOURCE_POLL_INTERVAL_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      SOURCE_POLL_INTERVAL_DISPLAY)

    .define(SOURCE_METHOD_CONFIG,
      Type.STRING,
      SOURCE_METHOD_DEFAULT,
      new MethodValidator(),
      Importance.HIGH,
      SOURCE_METHOD_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      SOURCE_METHOD_DISPLAY,
      new MethodRecommender())

    .define(SOURCE_URL_CONFIG,
      Type.STRING,
      NO_DEFAULT_VALUE,
      Importance.HIGH,
      SOURCE_URL_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      SOURCE_URL_DISPLAY)

    .define(SOURCE_PAYLOAD_GENERATOR_CONFIG,
      Type.CLASS,
      SOURCE_PAYLOAD_GENERATOR_DEFAULT,
      new InstanceOfValidator(PayloadGenerator.class),
      Importance.HIGH,
      SOURCE_PAYLOAD_GENERATOR_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      SOURCE_PAYLOAD_GENERATOR_DISPLAY,
      new ServiceProviderInterfaceRecommender<>(PayloadGenerator.class))

    .define(SOURCE_TOPIC_SELECTOR_CONFIG,
      Type.CLASS,
      SOURCE_TOPIC_SELECTOR_DEFAULT,
      new InstanceOfValidator(TopicSelector.class),
      Importance.HIGH,
      SOURCE_TOPIC_SELECTOR_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.SHORT,
      SOURCE_TOPIC_SELECTOR_DISPLAY,
      new ServiceProviderInterfaceRecommender<>(TopicSelector.class))

    .define(SOURCE_REQUEST_EXECUTOR_CONFIG,
      Type.CLASS,
      SOURCE_REQUEST_EXECUTOR_DEFAULT,
      new InstanceOfValidator(RequestExecutor.class),
      Importance.LOW,
      SOURCE_REQUEST_EXECUTOR_DOC,
      group,
      ++orderInGroup,
      ConfigDef.Width.NONE,
      SOURCE_REQUEST_EXECUTOR_DISPLAY,
      new ServiceProviderInterfaceRecommender<>(RequestExecutor.class))
    ;
}
 
private static void defineMessageGroup(final ConfigDef definitions) {
  int order = 0;

  definitions
      .define(
          MESSAGE_PATH_NAME_CONFIG,
          Type.STRING,
          MESSAGE_PATH_NAME_DEFAULT,
          Importance.HIGH,
          MESSAGE_PATH_NAME_DOC,
          MESSAGE_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Message name jsonpath query")
      .define(
          MESSAGE_PATH_KEY_CONFIG,
          Type.STRING,
          MESSAGE_PATH_KEY_DEFAULT,
          Importance.HIGH,
          MESSAGE_PATH_KEY_DOC,
          MESSAGE_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Correlation key jsonpath query")
      .define(
          MESSAGE_PATH_VARIABLES_CONFIG,
          Type.STRING,
          MESSAGE_PATH_VARIABLES_DEFAULT,
          Importance.MEDIUM,
          MESSAGE_PATH_VARIABLES_DOC,
          MESSAGE_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Message variables jsonpath query")
      .define(
          MESSAGE_PATH_TTL_CONFIG,
          Type.STRING,
          MESSAGE_PATH_TTL_DEFAULT,
          Importance.LOW,
          MESSAGE_PATH_TTL_DOC,
          MESSAGE_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Message TTL jsonpath query");
}
 
private static void defineWorkerGroup(final ConfigDef definitions) {
  int order = 0;

  definitions
      .define(
          WORKER_NAME_CONFIG,
          Type.STRING,
          WORKER_NAME_DEFAULT,
          Importance.LOW,
          WORKER_NAME_DOC,
          WORKER_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Name")
      .define(
          MAX_JOBS_TO_ACTIVATE_CONFIG,
          Type.INT,
          MAX_JOBS_TO_ACTIVATE_DEFAULT,
          Importance.MEDIUM,
          MAX_JOBS_TO_ACTIVATE_DOC,
          WORKER_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Max jobs to activate")
      .define(
          JOB_TIMEOUT_CONFIG,
          Type.LONG,
          JOB_TIMEOUT_DEFAULT,
          Importance.MEDIUM,
          JOB_TIMEOUT_DOC,
          WORKER_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Job timeout")
      .define(
          JOB_TYPES_CONFIG,
          Type.LIST,
          JOB_TYPES_DEFAULT,
          Importance.HIGH,
          JOB_TYPES_DOC,
          WORKER_CONFIG_GROUP,
          ++order,
          Width.LONG,
          "Job types")
      .define(
          JOB_HEADER_TOPICS_CONFIG,
          Type.STRING,
          JOB_HEADER_TOPICS_DEFAULT,
          Importance.HIGH,
          JOB_HEADER_TOPICS_DOC,
          WORKER_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Job topics header")
      .define(
          JOB_VARIABLES_CONFIG,
          Type.LIST,
          JOB_VARIABLES_DEFAULT,
          Importance.LOW,
          JOB_VARIABLES_DOC,
          WORKER_CONFIG_GROUP,
          ++order,
          Width.SHORT,
          "Job variables");
}
 
源代码28 项目: pubsub   文件: CloudPubSubSinkConnector.java
@Override
public ConfigDef config() {
  return new ConfigDef()
      .define(
          ConnectorUtils.CPS_PROJECT_CONFIG,
          Type.STRING,
          Importance.HIGH,
          "The project containing the topic to which to publish.")
      .define(
          ConnectorUtils.CPS_TOPIC_CONFIG,
          Type.STRING,
          Importance.HIGH,
          "The topic to which to publish.")
      .define(
          MAX_BUFFER_SIZE_CONFIG,
          Type.INT,
          DEFAULT_MAX_BUFFER_SIZE,
          ConfigDef.Range.between(1, Integer.MAX_VALUE),
          Importance.MEDIUM,
          "The maximum number of messages that can be received for the messages on a topic "
              + "partition before publishing them to Cloud Pub/Sub.")
      .define(
          MAX_BUFFER_BYTES_CONFIG,
          Type.LONG,
          DEFAULT_MAX_BUFFER_BYTES,
          ConfigDef.Range.between(1, DEFAULT_MAX_BUFFER_BYTES),
          Importance.MEDIUM,
          "The maximum number of bytes that can be received for the messages on a topic "
              + "partition before publishing the messages to Cloud Pub/Sub.")
      .define(
          MAX_DELAY_THRESHOLD_MS,
          Type.INT,
          DEFAULT_DELAY_THRESHOLD_MS,
          ConfigDef.Range.between(1, Integer.MAX_VALUE),
          Importance.MEDIUM,
          "The maximum amount of time to wait after receiving the first message in a batch for a "
              + "before publishing the messages to Cloud Pub/Sub.")
      .define(
          MAX_REQUEST_TIMEOUT_MS,
          Type.INT,
          DEFAULT_REQUEST_TIMEOUT_MS,
          ConfigDef.Range.between(10000, Integer.MAX_VALUE),
          Importance.MEDIUM,
          "The maximum amount of time to wait for a single publish request to Cloud Pub/Sub.")
      .define(
          MAX_TOTAL_TIMEOUT_MS,
          Type.INT,
          DEFAULT_TOTAL_TIMEOUT_MS,
          ConfigDef.Range.between(10000, Integer.MAX_VALUE),
          Importance.MEDIUM,
          "The maximum amount of time to wait for a publish to complete (including retries) in "
              + "Cloud Pub/Sub.")
      .define(
          MAX_SHUTDOWN_TIMEOUT_MS,
          Type.INT,
          DEFAULT_SHUTDOWN_TIMEOUT_MS,
          ConfigDef.Range.between(10000, Integer.MAX_VALUE),
          Importance.MEDIUM,
          "The maximum amount of time to wait for a publisher to shutdown when stopping task "
              + "in Kafka Connect.")
      .define(
          PUBLISH_KAFKA_METADATA,
          Type.BOOLEAN,
          false,
          Importance.MEDIUM,
          "When true, include the Kafka topic, partition, offset, and timestamp as message "
              + "attributes when a message is published to Cloud Pub/Sub.")
      .define(
         PUBLISH_KAFKA_HEADERS,
         Type.BOOLEAN,
         false,
         Importance.MEDIUM,
         "When true, include any headers as attributes when a message is published to Cloud Pub/Sub.")
      .define(CPS_MESSAGE_BODY_NAME,
          Type.STRING,
          DEFAULT_MESSAGE_BODY_NAME,
          Importance.MEDIUM,
          "When using a struct or map value schema, this field or key name indicates that the "
              + "corresponding value will go into the Pub/Sub message body.")
      .define(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
          Type.STRING,
          null,
          Importance.HIGH,
          "The path to the GCP credentials file")
      .define(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
          Type.STRING,
          null,
          Importance.HIGH,
          "GCP JSON credentials");
}
 
源代码29 项目: pubsub   文件: CloudPubSubSourceConnector.java
@Override
public ConfigDef config() {
  return new ConfigDef()
      .define(
          KAFKA_TOPIC_CONFIG,
          Type.STRING,
          Importance.HIGH,
          "The topic in Kafka which will receive messages that were pulled from Cloud Pub/Sub.")
      .define(
          ConnectorUtils.CPS_PROJECT_CONFIG,
          Type.STRING,
          Importance.HIGH,
          "The project containing the topic from which to pull messages.")
      .define(
          CPS_SUBSCRIPTION_CONFIG,
          Type.STRING,
          Importance.HIGH,
          "The name of the subscription to Cloud Pub/Sub.")
      .define(
          CPS_MAX_BATCH_SIZE_CONFIG,
          Type.INT,
          DEFAULT_CPS_MAX_BATCH_SIZE,
          ConfigDef.Range.between(1, Integer.MAX_VALUE),
          Importance.MEDIUM,
          "The minimum number of messages to batch per pull request to Cloud Pub/Sub.")
      .define(
          KAFKA_MESSAGE_KEY_CONFIG,
          Type.STRING,
          null,
          Importance.MEDIUM,
          "The Cloud Pub/Sub message attribute to use as a key for messages published to Kafka.")
      .define(
          KAFKA_MESSAGE_TIMESTAMP_CONFIG,
          Type.STRING,
          null,
          Importance.MEDIUM,
          "The optional Cloud Pub/Sub message attribute to use as a timestamp for messages "
              + "published to Kafka. The timestamp is Long value.")
      .define(
          KAFKA_PARTITIONS_CONFIG,
          Type.INT,
          DEFAULT_KAFKA_PARTITIONS,
          ConfigDef.Range.between(1, Integer.MAX_VALUE),
          Importance.MEDIUM,
          "The number of Kafka partitions for the Kafka topic in which messages will be "
              + "published to.")
      .define(
          KAFKA_PARTITION_SCHEME_CONFIG,
          Type.STRING,
          DEFAULT_KAFKA_PARTITION_SCHEME,
          new PartitionScheme.Validator(),
          Importance.MEDIUM,
          "The scheme for assigning a message to a partition in Kafka.")
      .define(
          ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
          Type.STRING,
          null,
          Importance.HIGH,
          "The path to the GCP credentials file")
      .define(
          ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
          Type.STRING,
          null,
          Importance.HIGH,
          "GCP JSON credentials")
      .define(
          USE_KAFKA_HEADERS,
          Type.BOOLEAN,
          false,
          Importance.LOW,
          "Use Kafka record headers to store Pub/Sub message attributes");
}
 
源代码30 项目: rya   文件: RyaSinkConfig.java
/**
 * @param configDef - The configuration schema definition that will be updated to include
 *   this configuration's fields. (not null)
 */
public static void addCommonDefinitions(final ConfigDef configDef) {
    requireNonNull(configDef);
    configDef.define(RYA_INSTANCE_NAME, Type.STRING, Importance.HIGH, RYA_INSTANCE_NAME_DOC);
}
 
 类所在包
 类方法
 同包方法