下面列出了怎么用org.apache.kafka.common.config.ConfigDef.Importance的API类实例代码及写法,或者点击链接到github查看源代码。
public static ConfigDef conf() {
return new ConfigDef()
.define(DB_NAME_ALIAS, Type.STRING, Importance.HIGH, "Db Name Alias")
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "Topic")
.define(DB_NAME_CONFIG, Type.STRING, Importance.HIGH, "Db Name")
.define(DB_HOST_NAME_CONFIG,Type.STRING,Importance.HIGH,"Db HostName")
.define(DB_PORT_CONFIG,Type.INT,Importance.HIGH,"Db Port")
.define(DB_USER_CONFIG,Type.STRING,Importance.HIGH,"Db User")
.define(DB_USER_PASSWORD_CONFIG,Type.STRING,Importance.HIGH,"Db User Password")
.define(TABLE_WHITELIST,Type.STRING,Importance.HIGH,"TAbles will be mined")
.define(PARSE_DML_DATA,Type.BOOLEAN,Importance.HIGH,"Parse DML Data")
.define(DB_FETCH_SIZE,Type.INT,Importance.HIGH,"Database Record Fetch Size")
.define(RESET_OFFSET,Type.BOOLEAN,Importance.HIGH,"Reset Offset")
.define(START_SCN,Type.STRING,"",Importance.LOW,"Start SCN")
.define(MULTITENANT, Type.BOOLEAN, Importance.HIGH, "Database is multitenant (container)")
.define(TABLE_BLACKLIST, Type.STRING, Importance.LOW, "Table will not be mined");
}
public static ConfigDef conf() {
int order = 0;
return new ConfigDef()
.define(
FS_URIS,
Type.LIST,
ConfigDef.NO_DEFAULT_VALUE,
Importance.HIGH,
FS_URIS_DOC,
CONNECTOR_GROUP,
++order,
ConfigDef.Width.LONG,
FS_URIS_DISPLAY
).define(
TOPIC,
Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
Importance.HIGH,
TOPIC_DOC,
CONNECTOR_GROUP,
++order,
ConfigDef.Width.LONG,
TOPIC_DISPLAY
);
}
private static void addModeOptions(ConfigDef config) {
int orderInGroup = 0;
config.define(
MODE_CONFIG,
Type.STRING,
MODE_UNSPECIFIED,
ConfigDef.ValidString.in(
MODE_UNSPECIFIED,
MODE_BULK,
MODE_TIMESTAMP,
MODE_INCREMENTING,
MODE_TIMESTAMP_INCREMENTING
),
Importance.HIGH,
MODE_DOC,
MODE_GROUP,
++orderInGroup,
Width.MEDIUM,
MODE_DISPLAY,
Collections.singletonList(
INCREMENTING_FIELD_NAME_CONFIG
)
).define(
INCREMENTING_FIELD_NAME_CONFIG,
Type.STRING,
INCREMENTING_FIELD_NAME_DEFAULT,
Importance.MEDIUM,
INCREMENTING_FIELD_NAME_DOC,
MODE_GROUP,
++orderInGroup,
Width.MEDIUM,
INCREMENTING_FIELD_NAME_DISPLAY
);
}
private static void addConnectorOptions(ConfigDef config) {
int orderInGroup = 0;
config.define(
POLL_INTERVAL_MS_CONFIG,
Type.STRING,
POLL_INTERVAL_MS_DEFAULT,
Importance.HIGH,
POLL_INTERVAL_MS_DOC,
CONNECTOR_GROUP,
++orderInGroup,
Width.SHORT,
POLL_INTERVAL_MS_DISPLAY
).define(
BATCH_MAX_ROWS_CONFIG,
Type.STRING,
BATCH_MAX_ROWS_DEFAULT,
Importance.LOW,
BATCH_MAX_ROWS_DOC,
CONNECTOR_GROUP,
++orderInGroup,
Width.SHORT,
BATCH_MAX_ROWS_DISPLAY
).define(
TOPIC_PREFIX_CONFIG,
Type.STRING,
Importance.HIGH,
TOPIC_PREFIX_DOC,
CONNECTOR_GROUP,
++orderInGroup,
Width.MEDIUM,
TOPIC_PREFIX_DISPLAY
);
}
public static ConfigDef conf() {
String group = "REST_HTTP";
int orderInGroup = 0;
return new ConfigDef()
.define(TOPIC_LIST_CONFIG,
Type.LIST,
NO_DEFAULT_VALUE,
Importance.HIGH,
TOPIC_LIST_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
TOPIC_LIST_DISPLAY)
;
}
public static ConfigDef conf(Map<String, ?> unparsedConfig) {
String group = "REST_HTTP";
int orderInGroup = 0;
ConfigDef config = new ConfigDef()
.define(RESPONSE_VAR_NAMES_CONFIG,
Type.LIST,
RESPONSE_VAR_NAMES_DEFAULT,
Importance.LOW,
RESPONSE_VAR_NAMES_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
RESPONSE_VAR_NAMES_DISPLAY)
;
// This is a bit hacky and there may be a better way of doing it, but I don't know it.
// We need to create config items dynamically, based on the parameter names,
// so we need a 2 pass parse of the config.
List<String> varNames = (List) config.parse(unparsedConfig).get(RESPONSE_VAR_NAMES_CONFIG);
for(String varName : varNames) {
config.define(String.format(RESPONSE_VAR_XPATH_CONFIG, varName),
Type.STRING,
RESPONSE_VAR_XPATH_DEFAULT,
Importance.HIGH,
String.format(RESPONSE_VAR_XPATH_DOC, varName),
group,
++orderInGroup,
ConfigDef.Width.SHORT,
String.format(RESPONSE_VAR_XPATH_DISPLAY, varName));
}
return(config);
}
public static ConfigDef conf(Map<String, ?> unparsedConfig) {
String group = "REST_HTTP";
int orderInGroup = 0;
ConfigDef config = new ConfigDef()
.define(RESPONSE_VAR_NAMES_CONFIG,
Type.LIST,
RESPONSE_VAR_NAMES_DEFAULT,
Importance.LOW,
RESPONSE_VAR_NAMES_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
RESPONSE_VAR_NAMES_DISPLAY)
;
// This is a bit hacky and there may be a better way of doing it, but I don't know it.
// We need to create config items dynamically, based on the parameter names,
// so we need a 2 pass parse of the config.
List<String> varNames = (List) config.parse(unparsedConfig).get(RESPONSE_VAR_NAMES_CONFIG);
for(String varName : varNames) {
config.define(String.format(RESPONSE_VAR_REGEX_CONFIG, varName),
Type.STRING,
RESPONSE_VAR_REGEX_DEFAULT,
Importance.HIGH,
String.format(RESPONSE_VAR_REGEX_DOC, varName),
group,
++orderInGroup,
ConfigDef.Width.SHORT,
String.format(RESPONSE_VAR_REGEX_DISPLAY, varName));
}
return(config);
}
public static void defineClientGroup(final ConfigDef definitions) {
int order = 0;
definitions
.define(
BROKER_CONTACTPOINT_CONFIG,
Type.STRING,
BROKER_CONTACTPOINT_DEFAULT,
Importance.HIGH,
BROKER_CONTACTPOINT_DOC,
CLIENT_CONFIG_GROUP,
++order,
Width.SHORT,
"Broker contact point")
.define(
REQUEST_TIMEOUT_CONFIG,
Type.LONG,
REQUEST_TIMEOUT_DEFAULT,
Importance.LOW,
REQUEST_TIMEOUT_DOC,
CLIENT_CONFIG_GROUP,
++order,
Width.SHORT,
"Request timeout")
.define(
USE_PLAINTEXT_CONFIG,
Type.BOOLEAN,
USE_PLAINTEXT_DEFAULT,
Importance.LOW,
USE_PLAINTEXT_DOC,
CLIENT_CONFIG_GROUP,
++order,
Width.SHORT,
"Use plaintext connection");
}
public static ConfigDef conf() {
return new ConfigDef()
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC)
.define(OWNER_CONFIG, Type.STRING, Importance.HIGH, OWNER_DOC)
.define(REPO_CONFIG, Type.STRING, Importance.HIGH, REPO_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 100, new BatchSizeValidator(), Importance.LOW, BATCH_SIZE_DOC)
.define(SINCE_CONFIG, Type.STRING, ZonedDateTime.now().minusYears(1).toInstant().toString(),
new TimestampValidator(), Importance.HIGH, SINCE_DOC)
.define(AUTH_USERNAME_CONFIG, Type.STRING, "", Importance.HIGH, AUTH_USERNAME_DOC)
.define(AUTH_PASSWORD_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_PASSWORD_DOC);
}
public static ConfigDef conf() {
return new ConfigDef()
.define(TWITTER_DEBUG_CONF, Type.BOOLEAN, false, Importance.LOW, TWITTER_DEBUG_DOC)
.define(TWITTER_OAUTH_CONSUMER_KEY_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_CONSUMER_KEY_DOC)
.define(TWITTER_OAUTH_SECRET_KEY_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_SECRET_KEY_DOC)
.define(TWITTER_OAUTH_ACCESS_TOKEN_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_ACCESS_TOKEN_DOC)
.define(TWITTER_OAUTH_ACCESS_TOKEN_SECRET_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_ACCESS_TOKEN_SECRET_DOC)
.define(FILTER_KEYWORDS_CONF, Type.LIST, Importance.HIGH, FILTER_KEYWORDS_DOC)
.define(
ConfigKeyBuilder.of(FILTER_USER_IDS_CONF, Type.LIST)
.importance(Importance.HIGH)
.documentation(FILTER_USER_IDS_DOC)
.defaultValue(Collections.emptyList())
.validator(USERID_VALIDATOR)
.build()
)
.define(KAFKA_STATUS_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_STATUS_TOPIC_DOC)
.define(PROCESS_DELETES_CONF, Type.BOOLEAN, Importance.HIGH, PROCESS_DELETES_DOC)
.define(
ConfigKeyBuilder.of(QUEUE_EMPTY_MS_CONF, Type.INT)
.importance(Importance.LOW)
.documentation(QUEUE_EMPTY_MS_DOC)
.defaultValue(100)
.validator(ConfigDef.Range.atLeast(10))
.build()
)
.define(
ConfigKeyBuilder.of(QUEUE_BATCH_SIZE_CONF, Type.INT)
.importance(Importance.LOW)
.documentation(QUEUE_BATCH_SIZE_DOC)
.defaultValue(100)
.validator(ConfigDef.Range.atLeast(1))
.build()
);
}
/**
* Create default mConfig.
* @return default mConfig
*/
public static ConfigDef baseConfigDef() {
return new ConfigDef()
.define(MqttSourceConstant.KAFKA_TOPIC, Type.STRING, "mqtt", Importance.LOW,
"Kafka topic to put received data \n Depends on message processor")
.define(MqttSourceConstant.MQTT_CLIENT_ID, Type.STRING, null, Importance.MEDIUM,
"mqtt client id to use don't set to use random")
.define(MqttSourceConstant.MQTT_CLEAN_SESSION, Type.BOOLEAN, true, Importance.HIGH,
"use clean session in connection?")
.define(MqttSourceConstant.MQTT_CONNECTION_TIMEOUT, Type.INT, 30, Importance.LOW,
"connection timeout to use")
.define(MqttSourceConstant.MQTT_KEEP_ALIVE_INTERVAL, Type.INT, 60, Importance.LOW,
"keepalive interval to use")
.define(MqttSourceConstant.MQTT_SERVER_URIS, Type.STRING,
"tcp://localhost:1883", Importance.HIGH,
"mqtt server to connect to")
.define(MqttSourceConstant.MQTT_TOPIC, Type.STRING, "#", Importance.HIGH,
"mqtt server to connect to")
.define(MqttSourceConstant.MQTT_QUALITY_OF_SERVICE, Type.INT, 1, Importance.LOW,
"mqtt qos to use")
.define(MqttSourceConstant.MQTT_SSL_CA_CERT, Type.STRING, null, Importance.LOW,
"CA cert file to use if using ssl",
"SSL", 1, ConfigDef.Width.LONG, "CA cert", MODE_SSL_RECOMMENDER)
.define(MqttSourceConstant.MQTT_SSL_CERT, Type.STRING, null, Importance.LOW,
"cert file to use if using ssl",
"SSL", 2, ConfigDef.Width.LONG, "Cert", MODE_SSL_RECOMMENDER)
.define(MqttSourceConstant.MQTT_SSL_PRIV_KEY, Type.STRING, null, Importance.LOW,
"cert priv key to use if using ssl",
"SSL", 3, ConfigDef.Width.LONG, "Key", MODE_SSL_RECOMMENDER)
.define(MqttSourceConstant.MQTT_USERNAME, Type.STRING, null, Importance.MEDIUM,
"username to authenticate to mqtt broker")
.define(MqttSourceConstant.MQTT_PASSWORD, Type.STRING, null, Importance.MEDIUM,
"password to authenticate to mqtt broker")
.define(MqttSourceConstant.MESSAGE_PROCESSOR, Type.CLASS,
DumbProcessor.class, Importance.HIGH,
"message processor to use");
}
public static ConfigDef conf() {
return new ConfigDef()
.define(PORT_CONF, Type.INT, 8088, Importance.HIGH, PORT_DOC)
.define(SSL_RENEGOTIATION_ALLOWED_CONF, Type.BOOLEAN, true, Importance.LOW, SSL_RENEGOTIATION_ALLOWED_DOC)
.define(KEYSTORE_PATH_CONF, Type.STRING, Importance.HIGH, KEYSTORE_PATH_DOC)
.define(KEYSTORE_PASSWORD_CONF, Type.PASSWORD, Importance.HIGH, KEYSTORE_PASSWORD_DOC)
.define(EVENT_COLLECTOR_URL_CONF, Type.STRING, "/services/collector/event", Importance.LOW, EVENT_COLLECTOR_URL_DOC)
.define(EVENT_COLLECTOR_INDEX_ALLOWED_CONF, Type.LIST, new ArrayList<>(), Importance.LOW, EVENT_COLLECTOR_INDEX_ALLOWED_DOC)
.define(EVENT_COLLECTOR_INDEX_DEFAULT_CONF, Type.STRING, Importance.HIGH, EVENT_COLLECTOR_INDEX_DEFAULT_DOC)
.define(TOPIC_PER_INDEX_CONF, Type.BOOLEAN, false, Importance.MEDIUM, TOPIC_PER_INDEX_DOC)
.define(TOPIC_PREFIX_CONF, Type.STRING, Importance.HIGH, TOPIC_PREFIX_DOC)
.define(BATCH_SIZE_CONF, Type.INT, 10000, Importance.LOW, BATCH_SIZE_DOC)
.define(BACKOFF_MS_CONF, Type.INT, 100, Importance.LOW, BACKOFF_MS_DOC);
}
public static ConfigDef conf() {
return new ConfigDef()
.define(AUTHORIZATION_TOKEN_CONF, Type.PASSWORD, Importance.HIGH, AUTHORIZATION_TOKEN_DOC)
.define(REMOTE_HOST_CONF, Type.STRING, Importance.HIGH, REMOTE_HOST_DOC)
.define(REMOTE_PORT_CONF, Type.INT, 8088, Importance.MEDIUM, REMOTE_PORT_DOC)
.define(SSL_CONF, Type.BOOLEAN, true, Importance.HIGH, SSL_DOC)
.define(SSL_VALIDATE_CERTIFICATES_CONF, Type.BOOLEAN, true, Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
.define(SSL_TRUSTSTORE_PATH_CONF, Type.STRING, "", Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
.define(SSL_TRUSTSTORE_PASSWORD_CONF, Type.PASSWORD, "", Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
.define(CONNECT_TIMEOUT_CONF, Type.INT, 20000, Importance.LOW, CONNECT_TIMEOUT_DOC)
.define(READ_TIMEOUT_CONF, Type.INT, 30000, Importance.LOW, READ_TIMEOUT_DOC)
.define(CURL_LOGGING_ENABLED_CONF, Type.BOOLEAN, false, Importance.LOW, CURL_LOGGING_ENABLED_DOC);
}
public static ConfigDef config() {
return new ConfigDef()
.define(
ConfigKeyBuilder.of(MY_SETTING_CONFIG, Type.STRING)
.documentation(MY_SETTING_DOC)
.importance(Importance.HIGH)
.build()
);
}
public static ConfigDef config() {
return new ConfigDef()
.define(
ConfigKeyBuilder.of(MY_SETTING_CONFIG, Type.STRING)
.documentation(MY_SETTING_DOC)
.importance(Importance.HIGH)
.build()
);
}
public static ConfigDef config() {
return new ConfigDef()
.define(
ConfigKeyBuilder.of(MY_SETTING_CONFIG, Type.STRING)
.documentation(MY_SETTING_DOC)
.importance(Importance.HIGH)
.build()
);
}
public static ConfigDef baseConfigDef(
int port,
String listeners,
String responseMediatypePreferred,
String responseMediatypeDefault,
String metricsJmxPrefix
) {
return incompleteBaseConfigDef()
.define(
PORT_CONFIG,
Type.INT,
port,
Importance.LOW,
PORT_CONFIG_DOC
).define(
LISTENERS_CONFIG,
Type.LIST,
listeners,
Importance.HIGH,
LISTENERS_DOC
).define(
RESPONSE_MEDIATYPE_PREFERRED_CONFIG,
Type.LIST,
responseMediatypePreferred,
Importance.LOW,
RESPONSE_MEDIATYPE_PREFERRED_CONFIG_DOC
).define(
RESPONSE_MEDIATYPE_DEFAULT_CONFIG,
Type.STRING,
responseMediatypeDefault,
Importance.LOW,
RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC
).define(
METRICS_JMX_PREFIX_CONFIG,
Type.STRING,
metricsJmxPrefix,
Importance.LOW,
METRICS_JMX_PREFIX_DOC
);
}
private static ConfigDef createConfigDef() {
ConfigDef configDef =
new ConfigDef() {
@Override
@SuppressWarnings("unchecked")
public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
Map<String, ConfigValue> results = super.validateAll(props);
// Don't validate child configs if the top level configs are broken
if (results.values().stream().anyMatch((c) -> !c.errorMessages().isEmpty())) {
return results;
}
boolean hasTopicsConfig = !props.getOrDefault(TOPICS_CONFIG, "").trim().isEmpty();
boolean hasTopicsRegexConfig =
!props.getOrDefault(TOPICS_REGEX_CONFIG, "").trim().isEmpty();
if (hasTopicsConfig && hasTopicsRegexConfig) {
results
.get(TOPICS_CONFIG)
.addErrorMessage(
format(
"%s and %s are mutually exclusive options, but both are set.",
TOPICS_CONFIG, TOPICS_REGEX_CONFIG));
} else if (!hasTopicsConfig && !hasTopicsRegexConfig) {
results
.get(TOPICS_CONFIG)
.addErrorMessage(
format("Must configure one of %s or %s", TOPICS_CONFIG, TOPICS_REGEX_CONFIG));
}
if (hasTopicsConfig) {
List<String> topics = (List<String>) results.get(TOPICS_CONFIG).value();
topics.forEach(
topic -> results.putAll(MongoSinkTopicConfig.validateAll(topic, props)));
} else if (hasTopicsRegexConfig) {
results.putAll(MongoSinkTopicConfig.validateRegexAll(props));
}
return results;
}
};
String group = "Connection";
int orderInGroup = 0;
configDef.define(
TOPICS_CONFIG,
Type.LIST,
TOPICS_DEFAULT,
Importance.HIGH,
TOPICS_DOC,
group,
++orderInGroup,
Width.MEDIUM,
TOPICS_DISPLAY);
configDef.define(
TOPICS_REGEX_CONFIG,
Type.STRING,
TOPICS_REGEX_DEFAULT,
Validators.isAValidRegex(),
Importance.HIGH,
TOPICS_REGEX_DOC,
group,
++orderInGroup,
Width.MEDIUM,
TOPICS_REGEX_DISPLAY);
configDef.define(
CONNECTION_URI_CONFIG,
Type.STRING,
CONNECTION_URI_DEFAULT,
errorCheckingValueValidator("A valid connection string", ConnectionString::new),
Importance.HIGH,
CONNECTION_URI_DOC,
group,
++orderInGroup,
Width.MEDIUM,
CONNECTION_URI_DISPLAY);
group = "Overrides";
orderInGroup = 0;
configDef.define(
TOPIC_OVERRIDE_CONFIG,
Type.STRING,
TOPIC_OVERRIDE_DEFAULT,
Validators.topicOverrideValidator(),
Importance.LOW,
TOPIC_OVERRIDE_DOC,
group,
++orderInGroup,
Width.MEDIUM,
TOPIC_OVERRIDE_DISPLAY);
MongoSinkTopicConfig.BASE_CONFIG.configKeys().values().forEach(configDef::define);
return configDef;
}
private static void addDatabaseOptions(ConfigDef config) {
int orderInGroup = 0;
config.define(
ES_HOST_CONF,
Type.STRING,
Importance.HIGH,
ES_HOST_DOC,
DATABASE_GROUP,
++orderInGroup,
Width.LONG,
ES_HOST_DISPLAY,
Collections.singletonList(INDEX_PREFIX_CONFIG)
).define(
ES_PORT_CONF,
Type.STRING,
Importance.HIGH,
ES_PORT_DOC,
DATABASE_GROUP,
++orderInGroup,
Width.LONG,
ES_PORT_DISPLAY,
Collections.singletonList(INDEX_PREFIX_CONFIG)
).define(
ES_USER_CONF,
Type.STRING,
null,
Importance.HIGH,
ES_USER_DOC,
DATABASE_GROUP,
++orderInGroup,
Width.LONG,
ES_USER_DISPLAY
).define(
ES_PWD_CONF,
Type.STRING,
null,
Importance.HIGH,
ES_PWD_DOC,
DATABASE_GROUP,
++orderInGroup,
Width.SHORT,
ES_PWD_DISPLAY
).define(
CONNECTION_ATTEMPTS_CONFIG,
Type.STRING,
CONNECTION_ATTEMPTS_DEFAULT,
Importance.LOW,
CONNECTION_ATTEMPTS_DOC,
DATABASE_GROUP,
++orderInGroup,
ConfigDef.Width.SHORT,
CONNECTION_ATTEMPTS_DISPLAY
).define(
CONNECTION_BACKOFF_CONFIG,
Type.STRING,
CONNECTION_BACKOFF_DEFAULT,
Importance.LOW,
CONNECTION_BACKOFF_DOC,
DATABASE_GROUP,
++orderInGroup,
Width.SHORT,
CONNECTION_BACKOFF_DISPLAY
).define(
INDEX_PREFIX_CONFIG,
Type.STRING,
INDEX_PREFIX_DEFAULT,
Importance.MEDIUM,
INDEX_PREFIX_DOC,
DATABASE_GROUP,
++orderInGroup,
Width.LONG,
INDEX_PREFIX_DISPLAY
);
}
public static ConfigDef conf() {
String group = "AWS";
int orderInGroup = 0;
return new ConfigDef()
.define(REGION_CONFIG,
Type.STRING,
NO_DEFAULT_VALUE,
new RegionValidator(),
Importance.HIGH,
REGION_DOC_CONFIG,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
REGION_DISPLAY_CONFIG,
new RegionRecommender())
.define(CREDENTIALS_PROVIDER_CLASS_CONFIG,
Type.CLASS,
CREDENTIALS_PROVIDER_CLASS_DEFAULT,
new CredentialsProviderValidator(),
Importance.HIGH,
CREDENTIALS_PROVIDER_DOC_CONFIG,
group,
++orderInGroup,
ConfigDef.Width.MEDIUM,
CREDENTIALS_PROVIDER_DISPLAY_CONFIG)
.define(FUNCTION_NAME_CONFIG,
Type.STRING,
NO_DEFAULT_VALUE,
Importance.HIGH,
FUNCTION_NAME_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
FUNCTION_NAME_DISPLAY)
.define(RETRY_BACKOFF_CONFIG,
Type.LONG,
RETRY_BACKOFF_DEFAULT,
Importance.LOW,
RETRY_BACKOFF_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
RETRY_BACKOFF_DISPLAY)
.define(INVOCATION_TYPE_CONFIG,
Type.STRING,
INVOCATION_TYPE_DEFAULT,
new InvocationTypeValidator(),
Importance.LOW,
INVOCATION_TYPE_DOC_CONFIG,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
INVOCATION_TYPE_DISPLAY_CONFIG,
new InvocationTypeRecommender())
.define(PAYLOAD_CONVERTER_CONFIG,
Type.CLASS,
PAYLOAD_CONVERTER_DEFAULT,
new PayloadConverterValidator(),
Importance.LOW,
PAYLOAD_CONVERTER_DOC_CONFIG,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
PAYLOAD_CONVERTER_DISPLAY_CONFIG,
new PayloadConverterRecommender())
;
}
public static ConfigDef conf(Map<String, ?> unparsedConfig) {
String group = "REST_HTTP";
int orderInGroup = 0;
ConfigDef config = new ConfigDef()
.define(REQUEST_BODY_CONFIG,
Type.STRING,
REQUEST_BODY_DEFAULT,
Importance.LOW,
REQUEST_BODY_DOC,
group,
++orderInGroup,
ConfigDef.Width.LONG,
REQUEST_BODY_DISPLAY)
.define(REQUEST_PARAMETER_NAMES_CONFIG,
Type.LIST,
REQUEST_PARAMETER_NAMES_DEFAULT,
Importance.HIGH,
REQUEST_PARAMETER_NAMES_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
REQUEST_PARAMETER_NAMES_DISPLAY)
.define(REQUEST_HEADERS_CONFIG,
Type.LIST,
REQUEST_HEADERS_DEFAULT,
Importance.HIGH,
REQUEST_HEADERS_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
REQUEST_HEADERS_DISPLAY)
;
// This is a bit hacky and there may be a better way of doing it, but I don't know it.
// We need to create config items dynamically, based on the parameter names,
// so we need a 2 pass parse of the config.
List<String> paramNames = (List) config.parse(unparsedConfig).get(REQUEST_PARAMETER_NAMES_CONFIG);
for(String paramName : paramNames) {
config.define(String.format(REQUEST_PARAMETER_VALUE_CONFIG, paramName),
Type.STRING,
REQUEST_PARAMETER_VALUE_DEFAULT,
Importance.HIGH,
String.format(REQUEST_PARAMETER_VALUE_DOC, paramName),
group,
++orderInGroup,
ConfigDef.Width.SHORT,
String.format(REQUEST_PARAMETER_VALUE_DISPLAY, paramName));
}
return(config);
}
public static ConfigDef conf(Map<String, ?> unparsedConfig) {
String group = "REST_HTTP";
int orderInGroup = 0;
ConfigDef config = new ConfigDef()
.define(VALUE_PROVIDER_CONFIG,
Type.CLASS,
VALUE_PROVIDER_DEFAULT,
new InstanceOfValidator(ValueProvider.class),
Importance.HIGH,
VALUE_PROVIDER_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
VALUE_PROVIDER_DISPLAY,
new ServiceProviderInterfaceRecommender(ValueProvider.class))
.define(TEMPLATE_ENGINE_CONFIG,
Type.CLASS,
TEMPLATE_ENGINE_DEFAULT,
new InstanceOfValidator(TemplateEngine.class),
Importance.HIGH,
TEMPLATE_ENGINE_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
TEMPLATE_ENGINE_DISPLAY,
new ServiceProviderInterfaceRecommender(TemplateEngine.class))
.define(REQUEST_BODY_TEMPLATE_CONFIG,
Type.STRING,
REQUEST_BODY_TEMPLATE_DEFAULT,
Importance.LOW,
REQUEST_BODY_TEMPLATE_DOC,
group,
++orderInGroup,
ConfigDef.Width.LONG,
REQUEST_BODY_TEMPLATE_DISPLAY)
.define(REQUEST_PARAMETER_NAMES_CONFIG,
Type.LIST,
REQUEST_PARAMETER_NAMES_DEFAULT,
Importance.LOW,
REQUEST_PARAMETER_NAMES_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
REQUEST_PARAMETER_NAMES_DISPLAY)
.define(REQUEST_HEADERS_TEMPLATE_CONFIG,
Type.LIST,
REQUEST_HEADERS_TEMPLATE_DEFAULT,
Importance.LOW,
REQUEST_HEADERS_TEMPLATE_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
REQUEST_HEADERS_TEMPLATE_DISPLAY)
;
// This is a bit hacky and there may be a better way of doing it, but I don't know it.
// We need to create config items dynamically, based on the parameter names,
// so we need a 2 pass parse of the config.
List<String> paramNames = (List) config.parse(unparsedConfig).get(REQUEST_PARAMETER_NAMES_CONFIG);
for(String paramName : paramNames) {
config.define(String.format(REQUEST_PARAMETER_TEMPLATE_CONFIG, paramName),
Type.STRING,
REQUEST_PARAMETER_TEMPLATE_DEFAULT,
Importance.HIGH,
String.format(REQUEST_PARAMETER_TEMPLATE_DOC, paramName),
group,
++orderInGroup,
ConfigDef.Width.SHORT,
String.format(REQUEST_PARAMETER_TEMPLATE_DISPLAY, paramName));
}
return(config);
}
public static ConfigDef conf() {
String group = "REST_HTTP";
int orderInGroup = 0;
return new ConfigDef()
.define(HTTP_CONNECTION_TIMEOUT_CONFIG,
Type.LONG,
HTTP_CONNECTION_TIMEOUT_DEFAULT,
Importance.LOW,
HTTP_CONNECTION_TIMEOUT_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
HTTP_CONNECTION_TIMEOUT_DISPLAY)
.define(HTTP_READ_TIMEOUT_CONFIG,
Type.LONG,
HTTP_READ_TIMEOUT_DEFAULT,
Importance.LOW,
HTTP_READ_TIMEOUT_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
HTTP_READ_TIMEOUT_DISPLAY)
.define(HTTP_KEEP_ALIVE_DURATION_CONFIG,
Type.LONG,
HTTP_KEEP_ALIVE_DURATION_DEFAULT,
Importance.LOW,
HTTP_KEEP_ALIVE_DURATION_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
HTTP_KEEP_ALIVE_DURATION_DISPLAY)
.define(HTTP_MAX_IDLE_CONNECTION_CONFIG,
Type.INT,
HTTP_MAX_IDLE_CONNECTION_DEFAULT,
Importance.LOW,
HTTP_MAX_IDLE_CONNECTION_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
HTTP_MAX_IDLE_CONNECTION_DISPLAY)
;
}
public static ConfigDef conf() {
String group = "REST";
int orderInGroup = 0;
return new ConfigDef()
.define(SINK_METHOD_CONFIG,
Type.STRING,
SINK_METHOD_DEFAULT,
new MethodValidator(),
Importance.HIGH,
SINK_METHOD_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
SINK_METHOD_DISPLAY,
new MethodRecommender())
.define(SINK_HEADERS_LIST_CONFIG,
Type.LIST,
Collections.EMPTY_LIST,
Importance.HIGH,
SINK_HEADERS_LIST_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
SINK_HEADERS_LIST_DISPLAY)
.define(SINK_URL_CONFIG,
Type.STRING,
NO_DEFAULT_VALUE,
Importance.HIGH,
SINK_URL_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
SINK_URL_DISPLAY)
.define(SINK_RETRY_BACKOFF_CONFIG,
Type.LONG,
SINK_RETRY_BACKOFF_DEFAULT,
Importance.LOW,
SINK_RETRY_BACKOFF_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
SINK_RETRY_BACKOFF_DISPLAY)
.define(SINK_HTTP_MAX_RETRIES_CONFIG,
Type.INT,
SINK_HTTP_MAX_RETRIES_DEFAULT,
Importance.LOW,
SINK_HTTP_MAX_RETRIES_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
SINK_HTTP_MAX_RETRIES_DISPLAY)
.define(SINK_HTTP_CODES_WHITELIST_CONFIG,
Type.STRING,
SINK_HTTP_CODES_WHITELIST_DEFAULT,
Importance.LOW,
SINK_HTTP_CODES_WHITELIST_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
SINK_HTTP_CODES_WHITELIST_DISPLAY)
.define(SINK_HTTP_CODES_BLACKLIST_CONFIG,
Type.STRING,
SINK_HTTP_CODES_BLACKLIST_DEFAULT,
Importance.LOW,
SINK_HTTP_CODES_BLACKLIST_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
SINK_HTTP_CODES_BLACKLIST_DISPLAY)
.define(SINK_REQUEST_EXECUTOR_CONFIG,
Type.CLASS,
SINK_REQUEST_EXECUTOR_DEFAULT,
Importance.LOW,
SINK_REQUEST_EXECUTOR_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
SINK_REQUEST_EXECUTOR_DISPLAY)
.define(SINK_DATE_FORMAT_CONFIG,
Type.STRING,
SINK_DATE_FORMAT_DEFAULT,
Importance.LOW,
SINK_DATE_FORMAT_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
SINK_DATE_FORMAT_DISPLAY)
;
}
public static ConfigDef conf() {
String group = "REST";
int orderInGroup = 0;
return new ConfigDef()
.define(SOURCE_POLL_INTERVAL_CONFIG,
Type.LONG,
SOURCE_POLL_INTERVAL_DEFAULT,
Importance.LOW,
SOURCE_POLL_INTERVAL_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
SOURCE_POLL_INTERVAL_DISPLAY)
.define(SOURCE_METHOD_CONFIG,
Type.STRING,
SOURCE_METHOD_DEFAULT,
new MethodValidator(),
Importance.HIGH,
SOURCE_METHOD_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
SOURCE_METHOD_DISPLAY,
new MethodRecommender())
.define(SOURCE_URL_CONFIG,
Type.STRING,
NO_DEFAULT_VALUE,
Importance.HIGH,
SOURCE_URL_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
SOURCE_URL_DISPLAY)
.define(SOURCE_PAYLOAD_GENERATOR_CONFIG,
Type.CLASS,
SOURCE_PAYLOAD_GENERATOR_DEFAULT,
new InstanceOfValidator(PayloadGenerator.class),
Importance.HIGH,
SOURCE_PAYLOAD_GENERATOR_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
SOURCE_PAYLOAD_GENERATOR_DISPLAY,
new ServiceProviderInterfaceRecommender<>(PayloadGenerator.class))
.define(SOURCE_TOPIC_SELECTOR_CONFIG,
Type.CLASS,
SOURCE_TOPIC_SELECTOR_DEFAULT,
new InstanceOfValidator(TopicSelector.class),
Importance.HIGH,
SOURCE_TOPIC_SELECTOR_DOC,
group,
++orderInGroup,
ConfigDef.Width.SHORT,
SOURCE_TOPIC_SELECTOR_DISPLAY,
new ServiceProviderInterfaceRecommender<>(TopicSelector.class))
.define(SOURCE_REQUEST_EXECUTOR_CONFIG,
Type.CLASS,
SOURCE_REQUEST_EXECUTOR_DEFAULT,
new InstanceOfValidator(RequestExecutor.class),
Importance.LOW,
SOURCE_REQUEST_EXECUTOR_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
SOURCE_REQUEST_EXECUTOR_DISPLAY,
new ServiceProviderInterfaceRecommender<>(RequestExecutor.class))
;
}
private static void defineMessageGroup(final ConfigDef definitions) {
int order = 0;
definitions
.define(
MESSAGE_PATH_NAME_CONFIG,
Type.STRING,
MESSAGE_PATH_NAME_DEFAULT,
Importance.HIGH,
MESSAGE_PATH_NAME_DOC,
MESSAGE_CONFIG_GROUP,
++order,
Width.SHORT,
"Message name jsonpath query")
.define(
MESSAGE_PATH_KEY_CONFIG,
Type.STRING,
MESSAGE_PATH_KEY_DEFAULT,
Importance.HIGH,
MESSAGE_PATH_KEY_DOC,
MESSAGE_CONFIG_GROUP,
++order,
Width.SHORT,
"Correlation key jsonpath query")
.define(
MESSAGE_PATH_VARIABLES_CONFIG,
Type.STRING,
MESSAGE_PATH_VARIABLES_DEFAULT,
Importance.MEDIUM,
MESSAGE_PATH_VARIABLES_DOC,
MESSAGE_CONFIG_GROUP,
++order,
Width.SHORT,
"Message variables jsonpath query")
.define(
MESSAGE_PATH_TTL_CONFIG,
Type.STRING,
MESSAGE_PATH_TTL_DEFAULT,
Importance.LOW,
MESSAGE_PATH_TTL_DOC,
MESSAGE_CONFIG_GROUP,
++order,
Width.SHORT,
"Message TTL jsonpath query");
}
private static void defineWorkerGroup(final ConfigDef definitions) {
int order = 0;
definitions
.define(
WORKER_NAME_CONFIG,
Type.STRING,
WORKER_NAME_DEFAULT,
Importance.LOW,
WORKER_NAME_DOC,
WORKER_CONFIG_GROUP,
++order,
Width.SHORT,
"Name")
.define(
MAX_JOBS_TO_ACTIVATE_CONFIG,
Type.INT,
MAX_JOBS_TO_ACTIVATE_DEFAULT,
Importance.MEDIUM,
MAX_JOBS_TO_ACTIVATE_DOC,
WORKER_CONFIG_GROUP,
++order,
Width.SHORT,
"Max jobs to activate")
.define(
JOB_TIMEOUT_CONFIG,
Type.LONG,
JOB_TIMEOUT_DEFAULT,
Importance.MEDIUM,
JOB_TIMEOUT_DOC,
WORKER_CONFIG_GROUP,
++order,
Width.SHORT,
"Job timeout")
.define(
JOB_TYPES_CONFIG,
Type.LIST,
JOB_TYPES_DEFAULT,
Importance.HIGH,
JOB_TYPES_DOC,
WORKER_CONFIG_GROUP,
++order,
Width.LONG,
"Job types")
.define(
JOB_HEADER_TOPICS_CONFIG,
Type.STRING,
JOB_HEADER_TOPICS_DEFAULT,
Importance.HIGH,
JOB_HEADER_TOPICS_DOC,
WORKER_CONFIG_GROUP,
++order,
Width.SHORT,
"Job topics header")
.define(
JOB_VARIABLES_CONFIG,
Type.LIST,
JOB_VARIABLES_DEFAULT,
Importance.LOW,
JOB_VARIABLES_DOC,
WORKER_CONFIG_GROUP,
++order,
Width.SHORT,
"Job variables");
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(
ConnectorUtils.CPS_PROJECT_CONFIG,
Type.STRING,
Importance.HIGH,
"The project containing the topic to which to publish.")
.define(
ConnectorUtils.CPS_TOPIC_CONFIG,
Type.STRING,
Importance.HIGH,
"The topic to which to publish.")
.define(
MAX_BUFFER_SIZE_CONFIG,
Type.INT,
DEFAULT_MAX_BUFFER_SIZE,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum number of messages that can be received for the messages on a topic "
+ "partition before publishing them to Cloud Pub/Sub.")
.define(
MAX_BUFFER_BYTES_CONFIG,
Type.LONG,
DEFAULT_MAX_BUFFER_BYTES,
ConfigDef.Range.between(1, DEFAULT_MAX_BUFFER_BYTES),
Importance.MEDIUM,
"The maximum number of bytes that can be received for the messages on a topic "
+ "partition before publishing the messages to Cloud Pub/Sub.")
.define(
MAX_DELAY_THRESHOLD_MS,
Type.INT,
DEFAULT_DELAY_THRESHOLD_MS,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait after receiving the first message in a batch for a "
+ "before publishing the messages to Cloud Pub/Sub.")
.define(
MAX_REQUEST_TIMEOUT_MS,
Type.INT,
DEFAULT_REQUEST_TIMEOUT_MS,
ConfigDef.Range.between(10000, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait for a single publish request to Cloud Pub/Sub.")
.define(
MAX_TOTAL_TIMEOUT_MS,
Type.INT,
DEFAULT_TOTAL_TIMEOUT_MS,
ConfigDef.Range.between(10000, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait for a publish to complete (including retries) in "
+ "Cloud Pub/Sub.")
.define(
MAX_SHUTDOWN_TIMEOUT_MS,
Type.INT,
DEFAULT_SHUTDOWN_TIMEOUT_MS,
ConfigDef.Range.between(10000, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait for a publisher to shutdown when stopping task "
+ "in Kafka Connect.")
.define(
PUBLISH_KAFKA_METADATA,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"When true, include the Kafka topic, partition, offset, and timestamp as message "
+ "attributes when a message is published to Cloud Pub/Sub.")
.define(
PUBLISH_KAFKA_HEADERS,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"When true, include any headers as attributes when a message is published to Cloud Pub/Sub.")
.define(CPS_MESSAGE_BODY_NAME,
Type.STRING,
DEFAULT_MESSAGE_BODY_NAME,
Importance.MEDIUM,
"When using a struct or map value schema, this field or key name indicates that the "
+ "corresponding value will go into the Pub/Sub message body.")
.define(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
Type.STRING,
null,
Importance.HIGH,
"The path to the GCP credentials file")
.define(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
Type.STRING,
null,
Importance.HIGH,
"GCP JSON credentials");
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(
KAFKA_TOPIC_CONFIG,
Type.STRING,
Importance.HIGH,
"The topic in Kafka which will receive messages that were pulled from Cloud Pub/Sub.")
.define(
ConnectorUtils.CPS_PROJECT_CONFIG,
Type.STRING,
Importance.HIGH,
"The project containing the topic from which to pull messages.")
.define(
CPS_SUBSCRIPTION_CONFIG,
Type.STRING,
Importance.HIGH,
"The name of the subscription to Cloud Pub/Sub.")
.define(
CPS_MAX_BATCH_SIZE_CONFIG,
Type.INT,
DEFAULT_CPS_MAX_BATCH_SIZE,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The minimum number of messages to batch per pull request to Cloud Pub/Sub.")
.define(
KAFKA_MESSAGE_KEY_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
"The Cloud Pub/Sub message attribute to use as a key for messages published to Kafka.")
.define(
KAFKA_MESSAGE_TIMESTAMP_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
"The optional Cloud Pub/Sub message attribute to use as a timestamp for messages "
+ "published to Kafka. The timestamp is Long value.")
.define(
KAFKA_PARTITIONS_CONFIG,
Type.INT,
DEFAULT_KAFKA_PARTITIONS,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The number of Kafka partitions for the Kafka topic in which messages will be "
+ "published to.")
.define(
KAFKA_PARTITION_SCHEME_CONFIG,
Type.STRING,
DEFAULT_KAFKA_PARTITION_SCHEME,
new PartitionScheme.Validator(),
Importance.MEDIUM,
"The scheme for assigning a message to a partition in Kafka.")
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
Type.STRING,
null,
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
Type.STRING,
null,
Importance.HIGH,
"GCP JSON credentials")
.define(
USE_KAFKA_HEADERS,
Type.BOOLEAN,
false,
Importance.LOW,
"Use Kafka record headers to store Pub/Sub message attributes");
}
/**
* @param configDef - The configuration schema definition that will be updated to include
* this configuration's fields. (not null)
*/
public static void addCommonDefinitions(final ConfigDef configDef) {
requireNonNull(configDef);
configDef.define(RYA_INSTANCE_NAME, Type.STRING, Importance.HIGH, RYA_INSTANCE_NAME_DOC);
}