下面列出了怎么用org.apache.kafka.common.Configurable的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
public static <T> T getConfiguredInstance(String className, Map<String, ?> configs) {
try {
Class<T> cls = (Class<T>) Class.forName(className);
if (cls == null) {
return null;
}
Object o = Utils.newInstance(cls);
if (o instanceof Configurable) {
((Configurable) o).configure(configs);
}
return cls.cast(o);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
public PayloadFormatter getPayloadFormatter() {
try {
PayloadFormatter payloadFormatter = ((Class<? extends PayloadFormatter>)
getClass(FORMATTER_CLASS_KEY)).getDeclaredConstructor().newInstance();
if (payloadFormatter instanceof Configurable) {
Map<String, Object>configs = originalsWithPrefix(FORMATTER_PREFIX);
((Configurable)payloadFormatter).configure(configs);
}
return payloadFormatter;
} catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Unable to create " + FORMATTER_CLASS_KEY, e);
}
}
@SuppressWarnings("unchecked")
AWSCredentialsProvider loadAwsCredentialsProvider() {
try {
AWSCredentialsProvider credentialsProvider = ((Class<? extends AWSCredentialsProvider>)
getClass(CREDENTIALS_PROVIDER_CLASS_KEY)).getDeclaredConstructor().newInstance();
if (credentialsProvider instanceof Configurable) {
Map<String, Object> configs = originalsWithPrefix(
CREDENTIALS_PROVIDER_CONFIG_PREFIX);
((Configurable)credentialsProvider).configure(configs);
}
return credentialsProvider;
} catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Unable to create " + CREDENTIALS_PROVIDER_CLASS_KEY, e);
}
}
public <T> T getConfiguredInstance(String key, Class<T> t, Producer<byte[], byte[]> producer) {
Class<?> c = getClass(key);
if (c == null) {
return null;
}
Object o = Utils.newInstance(c);
if (!t.isInstance(o)) {
throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
}
if (o instanceof Configurable) {
((Configurable) o).configure(configsWithCurrentProducer(producer));
}
return t.cast(o);
}
@SuppressWarnings("unchecked")
public AWSCredentialsProvider getCredentialsProvider(Map<String, ?> configs) {
log.warn(".get-credentials-provider:configs={}", configs);
try {
Object providerField = configs.get("class");
String providerClass = SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue();
if (null != providerField) {
providerClass = providerField.toString();
}
log.warn(".get-credentials-provider:field={}, class={}", providerField, providerClass);
AWSCredentialsProvider provider = ((Class<? extends AWSCredentialsProvider>)
getClass(providerClass)).newInstance();
if (provider instanceof Configurable) {
// Map<String, Object> configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX);
// configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(
// CREDENTIALS_PROVIDER_CONFIG_PREFIX.length(),
// CREDENTIALS_PROVIDER_CLASS_CONFIG.length()
// ));
((Configurable) provider).configure(configs);
}
log.warn(".get-credentials-provider:provider={}", provider);
return provider;
} catch (IllegalAccessException | InstantiationException e) {
throw new ConnectException(
"Invalid class for: " + SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG,
e
);
}
}
@SuppressWarnings("unchecked")
public AWSCredentialsProvider getAwsCredentialsProvider() {
try {
AWSCredentialsProvider awsCredentialsProvider = ((Class<? extends AWSCredentialsProvider>)
getClass(CREDENTIALS_PROVIDER_CLASS_CONFIG)).getDeclaredConstructor().newInstance();
if (awsCredentialsProvider instanceof Configurable) {
Map<String, Object> configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX);
configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()));
((Configurable) awsCredentialsProvider).configure(configs);
}
return awsCredentialsProvider;
} catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Invalid class for: " + CREDENTIALS_PROVIDER_CLASS_CONFIG, e);
}
}
public static <T> T getConfiguredInstance(Class<T> cls, Map<String, ?> configs) {
if (cls == null) {
return null;
}
Object o = Utils.newInstance(cls);
if (o instanceof Configurable) {
((Configurable) o).configure(configs);
}
return cls.cast(o);
}