下面列出了怎么用com.mongodb.ConnectionString的API类实例代码及写法,或者点击链接到github查看源代码。
private static MongoClient getClient(Boolean useCredentials, String replicaSet) {
MongoClientSettings.Builder settings = MongoClientSettings.builder();
if (useCredentials) {
MongoCredential credentials = MongoCredential.createCredential(
USER, ADMIN_DB, PASSWORD.toCharArray());
settings.credential(credentials);
}
StringBuilder connectionString = new StringBuilder(String.format("mongodb://%s:%d", HOST, PORT));
if (replicaSet != null) {
connectionString.append(String.format("/?replicaSet=%s", REPLICA_SET));
}
ConnectionString uri = new ConnectionString(connectionString.toString());
settings.applyConnectionString(uri);
settings.readPreference(ReadPreference.primaryPreferred());
return MongoClients.create(settings.build());
}
private DatabaseManager() {
final MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder()
.codecRegistry(CODEC_REGISTRY)
.applicationName(String.format("Shadbot V%s", Config.VERSION));
if (!Config.IS_SNAPSHOT) {
final String username = CredentialManager.getInstance().get(Credential.DATABASE_USERNAME);
final String pwd = CredentialManager.getInstance().get(Credential.DATABASE_PWD);
final String host = CredentialManager.getInstance().get(Credential.DATABASE_HOST);
final String port = CredentialManager.getInstance().get(Credential.DATABASE_PORT);
if (username != null && pwd != null && host != null && port != null) {
settingsBuilder.applyConnectionString(new ConnectionString(
String.format("mongodb://%s:%[email protected]%s:%s/%s", username, pwd, host, port, Config.DATABASE_NAME)));
}
}
this.client = MongoClients.create(settingsBuilder.build());
final MongoDatabase database = this.client.getDatabase(Config.DATABASE_NAME);
this.premiumCollection = new PremiumCollection(database);
this.guildsCollection = new GuildsCollection(database);
this.lotteryCollection = new LotteryCollection(database);
this.usersCollection = new UsersCollection(database);
}
public SocketSettingsParser(ConnectionString connectionString, JsonObject config) {
SocketSettings.Builder settings = SocketSettings.builder();
if (connectionString != null) {
settings.applyConnectionString(connectionString);
} else {
Integer connectTimeoutMS = config.getInteger("connectTimeoutMS");
if (connectTimeoutMS != null) {
settings.connectTimeout(connectTimeoutMS, MILLISECONDS);
}
Integer socketTimeoutMS = config.getInteger("socketTimeoutMS");
if (socketTimeoutMS != null) {
settings.readTimeout(socketTimeoutMS, MILLISECONDS);
}
Integer receiveBufferSize = config.getInteger("receiveBufferSize");
if (receiveBufferSize != null) {
settings.receiveBufferSize(receiveBufferSize);
}
Integer sendBufferSize = config.getInteger("sendBufferSize");
if (sendBufferSize != null) {
settings.sendBufferSize(sendBufferSize);
}
}
this.settings = settings.build();
}
@Test
public void testConnStringReadPreferenceTags() {
final ConnectionString connString = new ConnectionString("mongodb://localhost:27017/mydb?replicaSet=myapp" +
"&readPreference=nearest" +
"&readPreferenceTags=dc:ny,rack:1" +
"&readPreferenceTags=dc:ny" +
"&readPreferenceTags=");
List<TagSet> tagSets = new ArrayList<>();
List<Tag> tags = new ArrayList<>();
tags.add(new Tag("dc", "ny"));
tags.add(new Tag("rack", "1"));
tagSets.add(new TagSet(tags));
tags = new ArrayList<>();
tags.add(new Tag("dc", "ny"));
tagSets.add(new TagSet(tags));
tagSets.add(new TagSet());
ReadPreference expected = ReadPreference.valueOf("nearest", tagSets);
ReadPreference rp = new ReadPreferenceParser(connString, new JsonObject()).readPreference();
assertNotNull(rp);
assertEquals(expected, rp);
}
protected TestBase() {
Builder builder = MongoClientSettings.builder();
try {
builder.uuidRepresentation(mapperOptions.getUuidRepresentation());
} catch(Exception ignored) {
// not a 4.0 driver
}
MongoClientSettings clientSettings = builder
.applyConnectionString(new ConnectionString(getMongoURI()))
.build();
this.mongoClient = MongoClients.create(clientSettings);
this.database = getMongoClient().getDatabase(TEST_DB_NAME);
this.ds = Morphia.createDatastore(getMongoClient(), database.getName());
ds.setQueryFactory(new DefaultQueryFactory());
}
private MongoDb4Provider(final String connectionStringSource, final boolean isCapped,
final Integer collectionSize) {
LOGGER.debug("Creating ConnectionString {}...", connectionStringSource);
this.connectionString = new ConnectionString(connectionStringSource);
LOGGER.debug("Created ConnectionString {}", connectionString);
LOGGER.debug("Creating MongoClientSettings...");
// @formatter:off
final MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(this.connectionString)
.codecRegistry(CODEC_REGISTRIES)
.build();
// @formatter:on
LOGGER.debug("Created MongoClientSettings {}", settings);
LOGGER.debug("Creating MongoClient {}...", settings);
this.mongoClient = MongoClients.create(settings);
LOGGER.debug("Created MongoClient {}", mongoClient);
String databaseName = this.connectionString.getDatabase();
LOGGER.debug("Getting MongoDatabase {}...", databaseName);
this.mongoDatabase = this.mongoClient.getDatabase(databaseName);
LOGGER.debug("Got MongoDatabase {}", mongoDatabase);
this.isCapped = isCapped;
this.collectionSize = collectionSize;
}
static MongoClient mongoClient(String connectionString, int connectionTimeoutSeconds) {
MongoClientSettings settings = MongoClientSettings
.builder()
.applyConnectionString(new ConnectionString(connectionString))
.applyToClusterSettings(b -> {
b.serverSelectionTimeout(connectionTimeoutSeconds, SECONDS);
})
.build();
return MongoClients.create(settings);
}
private MongoSourceConfig(final Map<?, ?> originals, final boolean validateAll) {
super(CONFIG, originals, false);
connectionString = new ConnectionString(getString(CONNECTION_URI_CONFIG));
if (validateAll) {
INITIALIZERS.forEach(i -> i.accept(this));
}
}
@Bean
MongoClient mongoClient(MongoProperties properties) {
ConnectionString connectionString = new ConnectionString(properties.determineUri());
MongoClientSettings.Builder builder = MongoClientSettings
.builder()
.streamFactoryFactory(NettyStreamFactory::new)
.applyToClusterSettings(b -> b.applyConnectionString(connectionString))
.applyToConnectionPoolSettings(b -> b.applyConnectionString(connectionString))
.applyToServerSettings(b -> b.applyConnectionString(connectionString))
.applyToSslSettings(b -> b.applyConnectionString(connectionString))
.applyToSocketSettings(b -> b.applyConnectionString(connectionString))
.codecRegistry(fromRegistries(
MongoClients.getDefaultCodecRegistry(),
fromProviders(PojoCodecProvider.builder()
.automatic(true)
.register(News.class)
.build())
));
if (connectionString.getReadPreference() != null) {
builder.readPreference(connectionString.getReadPreference());
}
if (connectionString.getReadConcern() != null) {
builder.readConcern(connectionString.getReadConcern());
}
if (connectionString.getWriteConcern() != null) {
builder.writeConcern(connectionString.getWriteConcern());
}
if (connectionString.getApplicationName() != null) {
builder.applicationName(connectionString.getApplicationName());
}
return MongoClients.create(builder.build());
}
default MongoClient mongoClient() {
ConnectionString connectionString = new ConnectionString("mongodb://localhost/news");
MongoClientSettings.Builder builder = MongoClientSettings.builder()
.streamFactoryFactory(NettyStreamFactory::new)
.applyToClusterSettings((cs) -> cs
.applyConnectionString(connectionString))
.applyToConnectionPoolSettings(cps -> cps
.applyConnectionString(connectionString))
.applyToServerSettings(ss -> ss
.applyConnectionString(connectionString))
// TODO: Do not work with JDK11 without the next line being commented (null is not allowed)
//.credential(connectionString.getCredential())
.applyToSslSettings(ss -> ss
.applyConnectionString(connectionString))
.applyToSocketSettings(ss -> ss
.applyConnectionString(connectionString))
.codecRegistry(fromRegistries(
MongoClients.getDefaultCodecRegistry(),
fromProviders(PojoCodecProvider.builder()
.automatic(true)
.register(News.class)
.build())
));
if (connectionString.getReadPreference() != null) {
builder.readPreference(connectionString.getReadPreference());
}
if (connectionString.getReadConcern() != null) {
builder.readConcern(connectionString.getReadConcern());
}
if (connectionString.getWriteConcern() != null) {
builder.writeConcern(connectionString.getWriteConcern());
}
if (connectionString.getApplicationName() != null) {
builder.applicationName(connectionString.getApplicationName());
}
return MongoClients.create(builder.build());
}
private static void initializeReplicaSet(final List<IMongodConfig> mongodConfigList) throws UnknownHostException {
final String arbitrerAddress = "mongodb://" + mongodConfigList.get(0).net().getServerAddress().getHostName() + ":"
+ mongodConfigList.get(0).net().getPort();
final MongoClientSettings mo = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(arbitrerAddress)).build();
try (MongoClient mongo = MongoClients.create(mo)) {
final MongoDatabase mongoAdminDB = mongo.getDatabase("admin");
Document cr = mongoAdminDB.runCommand(new Document("isMaster", 1));
LOGGER.infof("isMaster: %s", cr);
// Build replica set configuration settings
final Document rsConfiguration = buildReplicaSetConfiguration(mongodConfigList);
LOGGER.infof("replSetSettings: %s", rsConfiguration);
// Initialize replica set
cr = mongoAdminDB.runCommand(new Document("replSetInitiate", rsConfiguration));
LOGGER.infof("replSetInitiate: %s", cr);
// Check replica set status before to proceed
await()
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(1, TimeUnit.MINUTES)
.until(() -> {
Document result = mongoAdminDB.runCommand(new Document("replSetGetStatus", 1));
LOGGER.infof("replSetGetStatus: %s", result);
return !isReplicaSetStarted(result);
});
}
}
private static void initializeReplicaSet(final List<IMongodConfig> mongodConfigList) throws UnknownHostException {
final String arbitrerAddress = "mongodb://" + mongodConfigList.get(0).net().getServerAddress().getHostName() + ":"
+ mongodConfigList.get(0).net().getPort();
final MongoClientSettings mo = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(arbitrerAddress)).build();
try (MongoClient mongo = MongoClients.create(mo)) {
final MongoDatabase mongoAdminDB = mongo.getDatabase("admin");
Document cr = mongoAdminDB.runCommand(new Document("isMaster", 1));
LOGGER.infof("isMaster: %s", cr);
// Build replica set configuration settings
final Document rsConfiguration = buildReplicaSetConfiguration(mongodConfigList);
LOGGER.infof("replSetSettings: %s", rsConfiguration);
// Initialize replica set
cr = mongoAdminDB.runCommand(new Document("replSetInitiate", rsConfiguration));
LOGGER.infof("replSetInitiate: %s", cr);
// Check replica set status before to proceed
await()
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(1, TimeUnit.MINUTES)
.until(() -> {
Document result = mongoAdminDB.runCommand(new Document("replSetGetStatus", 1));
LOGGER.infof("replSetGetStatus: %s", result);
return !isReplicaSetStarted(result);
});
}
}
@Override
public void customize(ComponentProxyComponent component, Map<String, Object> options) {
MongoCustomizersUtil.replaceAdminDBIfMissing(options);
// Set connection parameter
if (!options.containsKey("mongoConnection")) {
if (options.containsKey("user") && options.containsKey("password")
&& options.containsKey("host")) {
ConnectionParamsConfiguration mongoConf = new ConnectionParamsConfiguration(cast(options));
// We need to force consumption in order to perform property placeholder done by Camel
consumeOption(camelContext, options, "password", String.class, mongoConf::setPassword);
LOGGER.debug("Creating and registering a client connection to {}", mongoConf);
MongoClientSettings.Builder settings = MongoClientSettings.builder();
MongoCredential credentials = MongoCredential.createCredential(
mongoConf.getUser(),
mongoConf.getAdminDB(),
mongoConf.getPassword().toCharArray());
ConnectionString uri = new ConnectionString(mongoConf.getMongoClientURI());
settings.applyConnectionString(uri);
settings.credential(credentials);
MongoClient mongoClient = MongoClients.create(settings.build());
options.put("mongoConnection", mongoClient);
if (!options.containsKey("connectionBean")) {
//We safely put a default name instead of leaving null
options.put("connectionBean", String.format("%s-%s", mongoConf.getHost(), mongoConf.getUser()));
}
} else {
LOGGER.warn(
"Not enough information provided to set-up the MongoDB client. Required at least host, user and " +
"password.");
}
}
}
private void doWrite(Collection<RunResult> results) throws ParseException {
Date now = new Date();
StandardEnvironment env = new StandardEnvironment();
String projectVersion = env.getProperty("project.version", "unknown");
String gitBranch = env.getProperty("git.branch", "unknown");
String gitDirty = env.getProperty("git.dirty", "no");
String gitCommitId = env.getProperty("git.commit.id", "unknown");
ConnectionString uri = new ConnectionString(this.uri);
MongoClient client = MongoClients.create();
String dbName = StringUtils.hasText(uri.getDatabase()) ? uri.getDatabase() : "spring-data-mongodb-benchmarks";
MongoDatabase db = client.getDatabase(dbName);
String resultsJson = ResultsWriter.jsonifyResults(results).trim();
JSONArray array = (JSONArray) new JSONParser(JSONParser.MODE_PERMISSIVE).parse(resultsJson);
for (Object object : array) {
JSONObject dbo = (JSONObject) object;
String collectionName = extractClass(dbo.get("benchmark").toString());
Document sink = new Document();
sink.append("_version", projectVersion);
sink.append("_branch", gitBranch);
sink.append("_commit", gitCommitId);
sink.append("_dirty", gitDirty);
sink.append("_method", extractBenchmarkName(dbo.get("benchmark").toString()));
sink.append("_date", now);
sink.append("_snapshot", projectVersion.toLowerCase().contains("snapshot"));
sink.putAll(dbo);
db.getCollection(collectionName).insertOne(fixDocumentKeys(sink));
}
client.close();
}
public void migrate(Consumer<Mongo> consumer) {
var mongo = new MongoImpl();
try {
mongo.uri = new ConnectionString(uri);
mongo.initialize();
consumer.accept(mongo);
} catch (Throwable e) {
logger.error("failed to run migration", e);
throw e;
} finally {
mongo.close();
}
}
@Override
@Bean
public com.mongodb.client.MongoClient mongoClient() {
final CodecRegistry pojoCodecRegistry = fromRegistries(MongoClient.getDefaultCodecRegistry(),
fromProviders(PojoCodecProvider.builder().automatic(true).build()));
final String connectionsString = String.format("mongodb://%s:%[email protected]%s:%s", this.username, this.password, this.host, this.port);
final MongoClientSettings settings = MongoClientSettings.builder()
.codecRegistry(pojoCodecRegistry)
.applyConnectionString(new ConnectionString(
connectionsString))
.build();
return MongoClients.create(settings);
}
public ClusterSettingsParser(ConnectionString connectionString, JsonObject config) {
ClusterSettings.Builder settings = ClusterSettings.builder();
// ConnectionString takes precedence
if (connectionString != null) {
settings.applyConnectionString(connectionString);
} else {
// hosts
List<ServerAddress> hosts = parseHosts(config);
settings.hosts(hosts);
// replica set / mode
String replicaSet = config.getString("replicaSet");
if (hosts.size() == 1 && replicaSet == null) {
settings.mode(ClusterConnectionMode.SINGLE);
} else {
settings.mode(ClusterConnectionMode.MULTIPLE);
}
if (replicaSet != null) {
settings.requiredReplicaSetName(replicaSet);
}
// serverSelectionTimeoutMS
Long serverSelectionTimeoutMS = config.getLong("serverSelectionTimeoutMS");
if(serverSelectionTimeoutMS != null) {
settings.serverSelectionTimeout(serverSelectionTimeoutMS, MILLISECONDS);
}
}
this.settings = settings.build();
}
ReadPreferenceParser(ConnectionString connectionString, JsonObject config) {
ReadPreference connStringReadPreference = connectionString != null ? connectionString.getReadPreference() : null;
if (connStringReadPreference != null) {
// Prefer connection string's read preference
readPreference = connStringReadPreference;
} else {
ReadPreference rp;
String rps = config.getString("readPreference");
if (rps != null) {
JsonArray readPreferenceTags = config.getJsonArray("readPreferenceTags");
if (readPreferenceTags == null) {
rp = ReadPreference.valueOf(rps);
if (rp == null) throw new IllegalArgumentException("Invalid ReadPreference " + rps);
} else {
// Support advanced ReadPreference Tags
List<TagSet> tagSet = new ArrayList<>();
readPreferenceTags.forEach(o -> {
String tagString = (String) o;
List<Tag> tags = Stream.of(tagString.trim().split(","))
.map(s -> s.split(":"))
.filter(array -> {
if (array.length != 2) {
throw new IllegalArgumentException("Invalid readPreferenceTags value '" + tagString + "'");
}
return true;
}).map(array -> new Tag(array[0], array[1])).collect(Collectors.toList());
tagSet.add(new TagSet(tags));
});
rp = ReadPreference.valueOf(rps, tagSet);
}
} else {
rp = null;
}
readPreference = rp;
}
}
public ConnectionPoolSettingsParser(ConnectionString connectionString, JsonObject config) {
ConnectionPoolSettings.Builder settings = ConnectionPoolSettings.builder();
if (connectionString != null) {
settings.applyConnectionString(connectionString);
} else {
Integer maxPoolSize = config.getInteger("maxPoolSize");
if (maxPoolSize != null) {
settings.maxSize(maxPoolSize);
}
Integer minPoolSize = config.getInteger("minPoolSize");
if (minPoolSize != null) {
settings.minSize(minPoolSize);
}
Long maxIdleTimeMS = config.getLong("maxIdleTimeMS");
if (maxIdleTimeMS != null) {
settings.maxConnectionIdleTime(maxIdleTimeMS, MILLISECONDS);
}
Long maxLifeTimeMS = config.getLong("maxLifeTimeMS");
if (maxLifeTimeMS != null) {
settings.maxConnectionLifeTime(maxLifeTimeMS, MILLISECONDS);
}
Long waitQueueTimeoutMS = config.getLong("waitQueueTimeoutMS");
if (waitQueueTimeoutMS != null) {
settings.maxWaitTime(waitQueueTimeoutMS, MILLISECONDS);
}
Long maintenanceInitialDelayMS = config.getLong("maintenanceInitialDelayMS");
if (maintenanceInitialDelayMS != null) {
settings.maintenanceInitialDelay(maintenanceInitialDelayMS, MILLISECONDS);
}
Long maintenanceFrequencyMS = config.getLong("maintenanceFrequencyMS");
if (maintenanceFrequencyMS != null) {
settings.maintenanceFrequency(maintenanceFrequencyMS, MILLISECONDS);
}
}
this.settings = settings.build();
}
@Test
public void testConnStringWriteConcern() {
final ConnectionString connString = new ConnectionString("mongodb://localhost:27017/mydb?replicaSet=myapp&safe=true");
WriteConcern wc = new WriteConcernParser(connString, new JsonObject()).writeConcern();
assertNotNull(wc);
assertEquals(WriteConcern.ACKNOWLEDGED, wc);
}
@Test
public void testConnStringSimpleAndAdvancedWriteConcern() {
final ConnectionString connString = new ConnectionString("mongodb://localhost:27017/mydb?replicaSet=myapp" +
"&w=majority&wtimeoutms=20&journal=false");
WriteConcern expected = new WriteConcern("majority").withWTimeout(20, TimeUnit.MILLISECONDS).withJournal(false);
WriteConcern wc = new WriteConcernParser(connString, new JsonObject()).writeConcern();
assertNotNull(wc);
assertEquals(expected, wc);
}
@Test
public void testConnStringReadPreference() {
final ConnectionString connString = new ConnectionString("mongodb://localhost:27017/mydb?replicaSet=myapp&readPreference=primaryPreferred");
ReadPreference rp = new ReadPreferenceParser(connString, new JsonObject()).readPreference();
assertNotNull(rp);
assertEquals(ReadPreference.primaryPreferred(), rp);
}
public MongoDb4Connection(final ConnectionString connectionString, final MongoClient mongoClient,
final MongoDatabase mongoDatabase, final boolean isCapped, final Integer sizeInBytes) {
this.connectionString = connectionString;
this.mongoClient = mongoClient;
this.collection = getOrCreateMongoCollection(mongoDatabase, connectionString.getCollection(), isCapped,
sizeInBytes);
}
public MongoSinkConfig(final Map<String, String> originals) {
super(CONFIG, originals, false);
this.originals = unmodifiableMap(originals);
topics =
getList(TOPICS_CONFIG).isEmpty()
? Optional.empty()
: Optional.of(unmodifiableList(getList(TOPICS_CONFIG)));
topicsRegex =
getString(TOPICS_REGEX_CONFIG).isEmpty()
? Optional.empty()
: Optional.of(Pattern.compile(getString(TOPICS_REGEX_CONFIG)));
if (topics.isPresent() && topicsRegex.isPresent()) {
throw new ConfigException(
format(
"%s and %s are mutually exclusive options, but both are set.",
TOPICS_CONFIG, TOPICS_REGEX_CONFIG));
} else if (!topics.isPresent() && !topicsRegex.isPresent()) {
throw new ConfigException(
format("Must configure one of %s or %s", TOPICS_CONFIG, TOPICS_REGEX_CONFIG));
}
connectionString = new ConnectionString(getString(CONNECTION_URI_CONFIG));
topicSinkConnectorConfigMap =
new ConcurrentHashMap<>(
topics.orElse(emptyList()).stream()
.collect(
Collectors.toMap((t) -> t, (t) -> new MongoSinkTopicConfig(t, originals))));
// Process and validate overrides of regex values.
if (topicsRegex.isPresent()) {
originals.keySet().stream()
.filter(k -> k.startsWith(TOPIC_OVERRIDE_PREFIX))
.forEach(
k -> {
String topic = k.substring(TOPIC_OVERRIDE_PREFIX.length()).split("\\.")[0];
if (!topicSinkConnectorConfigMap.containsKey(topic)) {
topicSinkConnectorConfigMap.put(
topic, new MongoSinkTopicConfig(topic, originals));
}
});
}
}
public ConnectionString getConnectionString() {
return connectionString;
}
private static ConfigDef createConfigDef() {
ConfigDef configDef =
new ConfigDef() {
@Override
@SuppressWarnings("unchecked")
public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
Map<String, ConfigValue> results = super.validateAll(props);
// Don't validate child configs if the top level configs are broken
if (results.values().stream().anyMatch((c) -> !c.errorMessages().isEmpty())) {
return results;
}
boolean hasTopicsConfig = !props.getOrDefault(TOPICS_CONFIG, "").trim().isEmpty();
boolean hasTopicsRegexConfig =
!props.getOrDefault(TOPICS_REGEX_CONFIG, "").trim().isEmpty();
if (hasTopicsConfig && hasTopicsRegexConfig) {
results
.get(TOPICS_CONFIG)
.addErrorMessage(
format(
"%s and %s are mutually exclusive options, but both are set.",
TOPICS_CONFIG, TOPICS_REGEX_CONFIG));
} else if (!hasTopicsConfig && !hasTopicsRegexConfig) {
results
.get(TOPICS_CONFIG)
.addErrorMessage(
format("Must configure one of %s or %s", TOPICS_CONFIG, TOPICS_REGEX_CONFIG));
}
if (hasTopicsConfig) {
List<String> topics = (List<String>) results.get(TOPICS_CONFIG).value();
topics.forEach(
topic -> results.putAll(MongoSinkTopicConfig.validateAll(topic, props)));
} else if (hasTopicsRegexConfig) {
results.putAll(MongoSinkTopicConfig.validateRegexAll(props));
}
return results;
}
};
String group = "Connection";
int orderInGroup = 0;
configDef.define(
TOPICS_CONFIG,
Type.LIST,
TOPICS_DEFAULT,
Importance.HIGH,
TOPICS_DOC,
group,
++orderInGroup,
Width.MEDIUM,
TOPICS_DISPLAY);
configDef.define(
TOPICS_REGEX_CONFIG,
Type.STRING,
TOPICS_REGEX_DEFAULT,
Validators.isAValidRegex(),
Importance.HIGH,
TOPICS_REGEX_DOC,
group,
++orderInGroup,
Width.MEDIUM,
TOPICS_REGEX_DISPLAY);
configDef.define(
CONNECTION_URI_CONFIG,
Type.STRING,
CONNECTION_URI_DEFAULT,
errorCheckingValueValidator("A valid connection string", ConnectionString::new),
Importance.HIGH,
CONNECTION_URI_DOC,
group,
++orderInGroup,
Width.MEDIUM,
CONNECTION_URI_DISPLAY);
group = "Overrides";
orderInGroup = 0;
configDef.define(
TOPIC_OVERRIDE_CONFIG,
Type.STRING,
TOPIC_OVERRIDE_DEFAULT,
Validators.topicOverrideValidator(),
Importance.LOW,
TOPIC_OVERRIDE_DOC,
group,
++orderInGroup,
Width.MEDIUM,
TOPIC_OVERRIDE_DISPLAY);
MongoSinkTopicConfig.BASE_CONFIG.configKeys().values().forEach(configDef::define);
return configDef;
}
public ConnectionString getConnectionString() {
return connectionString;
}
private ConnectionString getConnectionString() {
String mongoURIProperty = System.getProperty(URI_SYSTEM_PROPERTY_NAME);
String mongoURIString =
mongoURIProperty == null || mongoURIProperty.isEmpty() ? DEFAULT_URI : mongoURIProperty;
return new ConnectionString(mongoURIString);
}
private static String createUri(final boolean sslEnabled) {
final ConnectionString connectionString = new ConnectionString(
"mongodb://" + KNOWN_USER + ":" + KNOWN_PASSWORD + "@" + KNOWN_SERVER_ADDRESS + "/" + KNOWN_DB_NAME +
"?ssl=" + sslEnabled);
return connectionString.getConnectionString();
}
ConnectionString connectionString(ConnectionString uri) {
return uri;
}