下面列出了com.mongodb.client.model.Aggregates#com.mongodb.client.MongoClient 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static MongoClient getClient(Boolean useCredentials, String replicaSet) {
MongoClientSettings.Builder settings = MongoClientSettings.builder();
if (useCredentials) {
MongoCredential credentials = MongoCredential.createCredential(
USER, ADMIN_DB, PASSWORD.toCharArray());
settings.credential(credentials);
}
StringBuilder connectionString = new StringBuilder(String.format("mongodb://%s:%d", HOST, PORT));
if (replicaSet != null) {
connectionString.append(String.format("/?replicaSet=%s", REPLICA_SET));
}
ConnectionString uri = new ConnectionString(connectionString.toString());
settings.applyConnectionString(uri);
settings.readPreference(ReadPreference.primaryPreferred());
return MongoClients.create(settings.build());
}
@Bean(destroyMethod = "")
public MongoClient mongoClient(@Value("${mongo.host:}") String host, @Value("${mongo.jndiName:}") String jndiName) throws NamingException
{
if (!Strings.isNullOrEmpty(jndiName))
{
JndiTemplate jndiTemplate = new JndiTemplate();
return jndiTemplate.lookup(jndiName, MongoClient.class);
}
else if (!Strings.isNullOrEmpty(host))
{
return MongoClients.create(host);
}
else
{
throw new RuntimeException("Either mongo.host or mongo.jndiName must be set");
}
}
@Test
public void test() {
final Logger logger = LogManager.getLogger();
final MapMessage<?, Object> mapMessage = new MapMessage<>();
mapMessage.with("SomeName", "SomeValue");
mapMessage.with("SomeInt", 1);
logger.info(mapMessage);
//
try (final MongoClient mongoClient = mongoDbTestRule.getMongoClient()) {
final MongoDatabase database = mongoClient.getDatabase("testDb");
Assert.assertNotNull(database);
final MongoCollection<Document> collection = database.getCollection("testCollection");
Assert.assertNotNull(collection);
final Document first = collection.find().first();
Assert.assertNotNull(first);
final String firstJson = first.toJson();
Assert.assertEquals(firstJson, "SomeValue", first.getString("SomeName"));
Assert.assertEquals(firstJson, Integer.valueOf(1), first.getInteger("SomeInt"));
}
}
MongoSinkContext(
MongoClient client,
MongoCollection<T> collection,
ConsumerEx<MongoClient> destroyFn,
boolean ordered,
boolean bypassValidation
) {
this.client = client;
this.collection = collection;
this.destroyFn = destroyFn;
this.insertManyOptions = new InsertManyOptions()
.ordered(ordered)
.bypassDocumentValidation(bypassValidation);
documents = new ArrayList<>();
}
MongoCopyDataManager(final MongoSourceConfig sourceConfig, final MongoClient mongoClient) {
this.sourceConfig = sourceConfig;
this.mongoClient = mongoClient;
String database = sourceConfig.getString(DATABASE_CONFIG);
String collection = sourceConfig.getString(COLLECTION_CONFIG);
List<MongoNamespace> namespaces;
if (database.isEmpty()) {
namespaces = getCollections(mongoClient);
} else if (collection.isEmpty()) {
namespaces = getCollections(mongoClient, database);
} else {
namespaces = singletonList(createNamespace(database, collection));
}
LOGGER.info("Copying existing data on the following namespaces: {}", namespaces);
namespacesToCopy = new AtomicInteger(namespaces.size());
queue = new ArrayBlockingQueue<>(sourceConfig.getInt(COPY_EXISTING_QUEUE_SIZE_CONFIG));
executor =
Executors.newFixedThreadPool(
Math.max(
1,
Math.min(
namespaces.size(), sourceConfig.getInt(COPY_EXISTING_MAX_THREADS_CONFIG))));
namespaces.forEach(n -> executor.submit(() -> copyDataFrom(n)));
}
@Test
void testMetricsInitialization() {
// Clients are created eagerly, this metric should always be initialized to zero once connected
assertEquals(0L, getGaugeValueOrNull("mongodb.connection-pool.size", getTags()));
assertEquals(0L, getGaugeValueOrNull("mongodb.connection-pool.checked-out-count", getTags()));
// Just need to execute something so that an connection is opened
String name = client.listDatabaseNames().first();
assertEquals(1L, getGaugeValueOrNull("mongodb.connection-pool.size", getTags()));
assertEquals(0L, getGaugeValueOrNull("mongodb.connection-pool.checked-out-count", getTags()));
client.close();
assertEquals(0L, getGaugeValueOrNull("mongodb.connection-pool.size", getTags()));
assertEquals(0L, getGaugeValueOrNull("mongodb.connection-pool.checked-out-count", getTags()));
// doing this here instead of in another method in order to avoid messing with the initialization stats
assertThat(Arc.container().instance(MongoClient.class).get()).isNotNull();
assertThat(Arc.container().instance(ReactiveMongoClient.class).get()).isNull();
}
@Override
public HealthCheckResponse call() {
HealthCheckResponseBuilder builder = HealthCheckResponse.named("MongoDB connection health check").up();
for (Map.Entry<String, MongoClient> client : clients.entrySet()) {
boolean isDefault = DEFAULT_CLIENT.equals(client.getKey());
MongoClient mongoClient = client.getValue();
try {
Document document = mongoClient.getDatabase("admin").runCommand(new Document("ping", 1));
String mongoClientName = isDefault ? "default" : client.getKey();
builder.up().withData(mongoClientName, document.toJson());
} catch (Exception e) {
return builder.down().withData("reason", e.getMessage()).build();
}
}
return builder.build();
}
@Override
protected MongoClient createClient(final String dbPath, final CodecRegistry codecRegistry) {
return MongoClients.create(MongoClientSettings.builder()
.dbPath(dbPath)
.codecRegistry(codecRegistry)
.build());
}
/**
* Runs a 'quick' test on the connection and then returns it if it passes.
*/
private boolean connectionTest(MongoClient conn)
{
try {
logger.info("connectionTest: Testing connection started.");
conn.listDatabaseNames();
logger.info("connectionTest: Testing connection completed - success.");
return true;
}
catch (RuntimeException ex) {
logger.warn("getOrCreateConn: Exception while testing existing connection.", ex);
}
logger.info("connectionTest: Testing connection completed - fail.");
return false;
}
/**
* Gets the special DOCDB_CONN_STR property from the provided split and uses its contents to getOrCreate
* a MongoDB client connection.
*
* @param split The split to that we need to read and this the DocDB instance to connecto ro.
* @return A MongoClient connected to the request DB instance.
* @note This method attempts to resolve any SecretsManager secrets that are using in the connection string and denoted
* by ${secret_name}.
*/
private MongoClient getOrCreateConn(Split split)
{
String conStr = split.getProperty(DOCDB_CONN_STR);
if (conStr == null) {
throw new RuntimeException(DOCDB_CONN_STR + " Split property is null! Unable to create connection.");
}
String endpoint = resolveSecrets(conStr);
return connectionFactory.getOrCreateConn(endpoint);
}
@Autowired
public ConfigService(
MongoClient mongoClient,
@Value("${mongo.database}") String databaseName
)
{
MongoDatabase database = mongoClient.getDatabase(databaseName);
MongoCollection<Document> collection = database.getCollection("config");
this.mongoCollection = collection;
// Create unique index on _userId
IndexOptions indexOptions = new IndexOptions().unique(true);
collection.createIndex(Indexes.ascending("_userId"), indexOptions);
}
/**
* If Glue is enabled as a source of supplemental metadata we look up the requested Schema/Table in Glue and
* filters out any results that don't have the DOCDB_METADATA_FLAG set. If no matching results were found in Glue,
* then we resort to inferring the schema of the DocumentDB collection using SchemaUtils.inferSchema(...). If there
* is no such table in DocumentDB the operation will fail.
*
* @see GlueMetadataHandler
*/
@Override
public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableRequest request)
throws Exception
{
logger.info("doGetTable: enter", request.getTableName());
Schema schema = null;
try {
if (glue != null) {
schema = super.doGetTable(blockAllocator, request, TABLE_FILTER).getSchema();
logger.info("doGetTable: Retrieved schema for table[{}] from AWS Glue.", request.getTableName());
}
}
catch (RuntimeException ex) {
logger.warn("doGetTable: Unable to retrieve table[{}:{}] from AWS Glue.",
request.getTableName().getSchemaName(),
request.getTableName().getTableName(),
ex);
}
if (schema == null) {
logger.info("doGetTable: Inferring schema for table[{}].", request.getTableName());
MongoClient client = getOrCreateConn(request);
schema = SchemaUtils.inferSchema(client, request.getTableName(), SCHEMA_INFERRENCE_NUM_DOCS);
}
return new GetTableResponse(request.getCatalogName(), request.getTableName(), schema);
}
@Test
public void clientCacheHitTest()
throws IOException
{
MongoClient mockConn = mock(MongoClient.class);
when(mockConn.listDatabaseNames()).thenReturn(null);
connectionFactory.addConnection("conStr", mockConn);
MongoClient conn = connectionFactory.getOrCreateConn("conStr");
assertEquals(mockConn, conn);
verify(mockConn, times(1)).listDatabaseNames();
}
/**
* Delete a database for a given path and userId.
* @param appInfo the info for this application
* @param serviceName the name of the associated service
* @param clientFactory the associated factory that creates clients
* @param userId the id of the user's to delete
* @return true if successfully deleted, false if not
*/
public static boolean deleteDatabase(final StitchAppClientInfo appInfo,
final String serviceName,
final EmbeddedMongoClientFactory clientFactory,
final String userId) {
final String dataDir = appInfo.getDataDirectory();
if (dataDir == null) {
throw new IllegalArgumentException("StitchAppClient not configured with a data directory");
}
final String instanceKey = String.format(
"%s-%s_sync_%s_%s", appInfo.getClientAppId(), dataDir, serviceName, userId);
final String dbPath = String.format(
"%s/%s/sync_mongodb_%s/%s/0/", dataDir, appInfo.getClientAppId(), serviceName, userId);
final MongoClient client =
clientFactory.getClient(instanceKey, dbPath, appInfo.getCodecRegistry());
for (final String listDatabaseName : client.listDatabaseNames()) {
try {
client.getDatabase(listDatabaseName).drop();
} catch (Exception e) {
// do nothing
}
}
client.close();
clientFactory.removeClient(instanceKey);
return new File(dbPath).delete();
}
/**
* See {@link MongoDBSinks#builder(String, SupplierEx)}
*/
MongoDBSinkBuilder(
@Nonnull String name,
@Nonnull SupplierEx<MongoClient> connectionSupplier
) {
checkSerializable(connectionSupplier, "connectionSupplier");
this.name = name;
this.connectionSupplier = connectionSupplier;
}
private MongoDBSourceBuilder(
@Nonnull String name,
@Nonnull SupplierEx<? extends MongoClient> connectionSupplier
) {
checkSerializable(connectionSupplier, "connectionSupplier");
this.name = name;
this.connectionSupplier = connectionSupplier;
}
public static MongoClient getClient(
final StitchAppClientInfo appInfo,
final EmbeddedMongoClientFactory clientFactory
) {
final String dataDir = appInfo.getDataDirectory();
if (dataDir == null) {
throw new IllegalArgumentException("StitchAppClient not configured with a data directory");
}
final String instanceKey = String.format("local-%s-%s", appInfo.getClientAppId(), dataDir);
final String dbPath = String.format(
"%s/%s/local_mongodb/0/", dataDir, appInfo.getClientAppId());
return clientFactory.getClient(instanceKey, dbPath, appInfo.getCodecRegistry());
}
private static <T, U> SupplierEx<StreamContext<T, U>> contextFn(
SupplierEx<? extends MongoClient> connectionSupplier,
FunctionEx<? super MongoClient, ? extends MongoDatabase> databaseFn,
ConsumerEx<? super MongoClient> destroyFn,
FunctionEx<? super MongoDatabase, ? extends ChangeStreamIterable<? extends T>> searchFn,
FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
return () -> {
MongoClient client = connectionSupplier.get();
MongoDatabase database = databaseFn.apply(client);
ChangeStreamIterable<? extends T> changeStreamIterable = searchFn.apply(database);
return new StreamContext<>(client, changeStreamIterable, mapFn, destroyFn, startAtOperationTimeFn);
};
}
protected DatastoreImpl(final MongoClient mongoClient, final MapperOptions options, final String dbName) {
this.mongoClient = mongoClient;
MongoDatabase database = mongoClient.getDatabase(dbName);
this.mapper = new Mapper(this, database.getCodecRegistry(), options);
this.database = database
.withCodecRegistry(mapper.getCodecRegistry());
this.queryFactory = options.getQueryFactory();
}
@Setup
public void setUp() {
MongoClient client = mock(MongoClient.class);
MongoDatabase db = mock(MongoDatabase.class);
MongoCollection<Document> collection = mock(MongoCollection.class);
when(client.getDatabase(anyString())).thenReturn(db);
when(db.getCollection(anyString(), eq(Document.class))).thenReturn(collection);
MongoDatabaseFactory factory = new SimpleMongoClientDatabaseFactory(client, "mock-database");
templateWithoutContext = new MongoTemplate(factory);
templateWithEmptyContext = new MongoTemplate(factory);
StaticApplicationContext empty = new StaticApplicationContext();
empty.refresh();
templateWithEmptyContext.setApplicationContext(empty);
templateWithContext = new MongoTemplate(factory);
templateWithContext.setApplicationContext(new AnnotationConfigApplicationContext(EntityCallbackConfig.class));
source = new Person();
source.id = "luke-skywalker";
source.firstname = "luke";
source.lastname = "skywalker";
source.address = new Address();
source.address.street = "melenium falcon 1";
source.address.city = "deathstar";
}
BatchContext(
MongoClient client,
MongoCollection<? extends T> collection,
FunctionEx<? super MongoCollection<? extends T>, ? extends FindIterable<? extends T>> searchFn,
FunctionEx<? super T, U> mapFn,
ConsumerEx<? super MongoClient> destroyFn
) {
this.client = client;
this.mapFn = mapFn;
this.destroyFn = destroyFn;
cursor = searchFn.apply(collection).iterator();
}
StreamContext(
MongoClient client,
ChangeStreamIterable<? extends T> changeStreamIterable,
FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
ConsumerEx<? super MongoClient> destroyFn,
FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
this.client = client;
this.changeStreamIterable = changeStreamIterable;
this.mapFn = mapFn;
this.destroyFn = destroyFn;
this.timestamp = startAtOperationTimeFn == null ? null : startAtOperationTimeFn.apply(client);
}
@Override
protected MongoClient createClient(final String dbPath, final CodecRegistry codecRegistry) {
return MongoClients.create(MongoClientSettings.builder()
.dbPath(dbPath)
.codecRegistry(codecRegistry)
.build());
}
static MongoClient mongoClient(String connectionString, int connectionTimeoutSeconds) {
MongoClientSettings settings = MongoClientSettings
.builder()
.applyConnectionString(new ConnectionString(connectionString))
.applyToClusterSettings(b -> {
b.serverSelectionTimeout(connectionTimeoutSeconds, SECONDS);
})
.build();
return MongoClients.create(settings);
}
private MongoClient getMongoClient() {
if (mongoClient == null) {
mongoClient =
MongoClients.create(
sinkConfig.getConnectionString(), getMongoDriverInformation(CONNECTOR_TYPE));
}
return mongoClient;
}
private static List<MongoNamespace> getCollections(final MongoClient mongoClient) {
return mongoClient.listDatabaseNames().into(new ArrayList<>()).stream()
.filter(s -> !(s.startsWith("admin") || s.startsWith("config") || s.startsWith("local")))
.map(d -> getCollections(mongoClient, d))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
private MongoCursor<BsonDocument> tryCreateCursor(
final MongoSourceConfig sourceConfig,
final MongoClient mongoClient,
final BsonDocument resumeToken) {
try {
ChangeStreamIterable<Document> changeStreamIterable =
getChangeStreamIterable(sourceConfig, mongoClient);
if (resumeToken != null && supportsStartAfter) {
LOGGER.info("Resuming the change stream after the previous offset: {}", resumeToken);
changeStreamIterable.startAfter(resumeToken);
} else if (resumeToken != null && !invalidatedCursor) {
LOGGER.info("Resuming the change stream after the previous offset using resumeAfter.");
changeStreamIterable.resumeAfter(resumeToken);
} else {
LOGGER.info("New change stream cursor created without offset.");
}
return changeStreamIterable.withDocumentClass(BsonDocument.class).iterator();
} catch (MongoCommandException e) {
if (resumeToken != null) {
if (e.getErrorCode() == 260) {
invalidatedCursor = true;
return tryCreateCursor(sourceConfig, mongoClient, null);
} else if ((e.getErrorCode() == 9 || e.getErrorCode() == 40415)
&& e.getErrorMessage().contains("startAfter")) {
supportsStartAfter = false;
return tryCreateCursor(sourceConfig, mongoClient, resumeToken);
}
}
LOGGER.info("Failed to resume change stream: {} {}", e.getErrorMessage(), e.getErrorCode());
return null;
}
}
private ChangeStreamIterable<Document> getChangeStreamIterable(
final MongoSourceConfig sourceConfig, final MongoClient mongoClient) {
String database = sourceConfig.getString(DATABASE_CONFIG);
String collection = sourceConfig.getString(COLLECTION_CONFIG);
Optional<List<Document>> pipeline = sourceConfig.getPipeline();
ChangeStreamIterable<Document> changeStream;
if (database.isEmpty()) {
LOGGER.info("Watching all changes on the cluster");
changeStream = pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
} else if (collection.isEmpty()) {
LOGGER.info("Watching for database changes on '{}'", database);
MongoDatabase db = mongoClient.getDatabase(database);
changeStream = pipeline.map(db::watch).orElse(db.watch());
} else {
LOGGER.info("Watching for collection changes on '{}.{}'", database, collection);
MongoCollection<Document> coll = mongoClient.getDatabase(database).getCollection(collection);
changeStream = pipeline.map(coll::watch).orElse(coll.watch());
}
int batchSize = sourceConfig.getInt(BATCH_SIZE_CONFIG);
if (batchSize > 0) {
changeStream.batchSize(batchSize);
}
sourceConfig.getFullDocument().ifPresent(changeStream::fullDocument);
sourceConfig.getCollation().ifPresent(changeStream::collation);
return changeStream;
}
private boolean isReplicaSetOrSharded() {
try (MongoClient mongoClient = MongoClients.create(getConnectionString())) {
Document isMaster =
mongoClient.getDatabase("admin").runCommand(BsonDocument.parse("{isMaster: 1}"));
return isMaster.containsKey("setName") || isMaster.get("msg", "").equals("isdbgrid");
} catch (Exception e) {
return false;
}
}
@Override
public boolean open() {
if (isOpen()) {
return true;
}
LOG.info("Initializing MongoDB at {}", mongoClientUri);
// Get the client and create a session for this instance
MongoClient mongoClient =
MongoConnectionManager.inst().getMongoClientInstance(this.mongoClientUri);
ClientSessionOptions sessionOptions =
ClientSessionOptions.builder()
.causallyConsistent(true)
.defaultTransactionOptions(
TransactionOptions.builder()
.readConcern(ReadConcern.DEFAULT)
.writeConcern(WriteConcern.MAJORITY)
.readPreference(ReadPreference.nearest())
.build())
.build();
this.clientSession = mongoClient.startSession(sessionOptions);
// Get the database and our collection. Mongo takes care of creating these if they don't
// exist
MongoDatabase mongoDb = mongoClient.getDatabase(MongoConstants.AION_DB_NAME);
// Gets the collection where we will be saving our values. Mongo creates it if it doesn't
// yet exist
this.collection = mongoDb.getCollection(this.name, BsonDocument.class);
LOG.info("Finished opening the Mongo connection");
return isOpen();
}