下面列出了怎么用com.mongodb.ClientSessionOptions的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Publisher<ClientSession> startSession(final ClientSessionOptions options) {
return new ObservableToPublisher<ClientSession>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<ClientSession>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<ClientSession> clientSessionSingleResultCallback) {
wrapped.startSession(options, new com.mongodb.async.SingleResultCallback<com.mongodb.async.client.ClientSession>() {
@Override
public void onResult(final com.mongodb.async.client.ClientSession result, final Throwable t) {
if (t != null) {
clientSessionSingleResultCallback.onResult(null, t);
} else {
clientSessionSingleResultCallback.onResult(new ClientSessionImpl(result, this), null);
}
}
});
}
}));
}
@Override
public boolean open() {
if (isOpen()) {
return true;
}
LOG.info("Initializing MongoDB at {}", mongoClientUri);
// Get the client and create a session for this instance
MongoClient mongoClient =
MongoConnectionManager.inst().getMongoClientInstance(this.mongoClientUri);
ClientSessionOptions sessionOptions =
ClientSessionOptions.builder()
.causallyConsistent(true)
.defaultTransactionOptions(
TransactionOptions.builder()
.readConcern(ReadConcern.DEFAULT)
.writeConcern(WriteConcern.MAJORITY)
.readPreference(ReadPreference.nearest())
.build())
.build();
this.clientSession = mongoClient.startSession(sessionOptions);
// Get the database and our collection. Mongo takes care of creating these if they don't
// exist
MongoDatabase mongoDb = mongoClient.getDatabase(MongoConstants.AION_DB_NAME);
// Gets the collection where we will be saving our values. Mongo creates it if it doesn't
// yet exist
this.collection = mongoDb.getCollection(this.name, BsonDocument.class);
LOG.info("Finished opening the Mongo connection");
return isOpen();
}
@Test
void testSessionCreation() {
// Session requires replicas
ClientSession session = client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())
.await().indefinitely();
assertThat(session).isNotNull();
session.close();
}
@Override
public ClientSession getSession(ClientSessionOptions options) {
return this.mongoDbFactory.getSession(options);
}
@Override
public Uni<ClientSession> startSession(ClientSessionOptions options) {
return Wrappers.toUni(client.startSession(options));
}
@Override
public Publisher<ClientSession> startSession(final ClientSessionOptions options) {
return mongoClient.startSession(options);
}
@Override
public ClientSessionOptions getOptions() {
return wrapped.getOptions();
}
@Override
public Publisher<ClientSession> startSession() {
return startSession(ClientSessionOptions.builder().build());
}
@Override
public ClientSession getSession(ClientSessionOptions options) {
return dataSource.getClient().startSession();
}
@Override
public ClientSessionOptions getOptions() {
return session.getOptions();
}
@Override
public MorphiaSession startSession(final ClientSessionOptions options) {
return new MorphiaSessionImpl(mongoClient.startSession(options), mongoClient, database, mapper, queryFactory);
}
@Override
public <T> T withTransaction(final ClientSessionOptions options, final MorphiaTransaction<T> transaction) {
return doTransaction(startSession(options), transaction);
}
/**
* Creates a client session.
*
* @param options the options for the client session
* @return a {@link Uni} completed when the session is ready to be used.
*/
Uni<ClientSession> startSession(ClientSessionOptions options);
/**
* Creates a client session.
*
* @param options the options for the client session
* @return a publisher for the client session.
* @mongodb.server.release 3.6
* @since 1.7
*/
Publisher<ClientSession> startSession(ClientSessionOptions options);
/**
* Starts a new session on the server.
*
* @param options the options to apply
* @return the new session reference
* @morphia.experimental
* @since 2.0
*/
MorphiaSession startSession(ClientSessionOptions options);
/**
* @param <T> the return type
* @param options the session options to apply
* @param transaction the transaction wrapper
* @return the return value
* @morphia.experimental
* @since 2.0
*/
<T> T withTransaction(ClientSessionOptions options, MorphiaTransaction<T> transaction);