下面列出了怎么用com.mongodb.client.ClientSession的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Executes the update
*
* @param options the options to apply
* @return the results
*/
public UpdateResult execute(final UpdateOptions options) {
Document updateOperations = toDocument();
final Document queryObject = getQuery().toDocument();
ClientSession session = getDatastore().findSession(options);
MongoCollection<T> mongoCollection = options.prepare(getCollection());
if (options.isMulti()) {
return session == null ? mongoCollection.updateMany(queryObject, updateOperations, options)
: mongoCollection.updateMany(session, queryObject, updateOperations, options);
} else {
return session == null ? mongoCollection.updateOne(queryObject, updateOperations, options)
: mongoCollection.updateOne(session, queryObject, updateOperations, options);
}
}
protected <T> void saveDocument(final T entity, final MongoCollection<T> collection, final InsertOneOptions options) {
Object id = mapper.getMappedClass(entity.getClass()).getIdField().getFieldValue(entity);
ClientSession clientSession = findSession(options);
if (id == null) {
if (clientSession == null) {
options.prepare(collection).insertOne(entity, options.getOptions());
} else {
options.prepare(collection).insertOne(clientSession, entity, options.getOptions());
}
} else {
ReplaceOptions updateOptions = new ReplaceOptions()
.bypassDocumentValidation(options.getBypassDocumentValidation())
.upsert(true);
MongoCollection<T> updated = collection;
if (options.writeConcern() != null) {
updated = collection.withWriteConcern(options.writeConcern());
}
if (clientSession == null) {
updated.replaceOne(new Document("_id", id), entity, updateOptions);
} else {
updated.replaceOne(clientSession, new Document("_id", id), entity, updateOptions);
}
}
}
private void insertEdgeWithId(MongoCollection edgeCollection, ObjectId id, User user, User toFollow, ClientSession session) {
// try {
edgeCollection.insertOne( session, makeEdgeWithId(id, user, toFollow));
// } catch( MongoCommandException e ) {
// if (e.getErrorCode() != 11000) {
// throw e; // System.err.println(e.getErrorMessage());
// } else {
// inserting duplicate edge is fine. keep going.
// System.out.println("Duplicate key when inserting follow");
// }
// }
}
BaseMorphiaSession(final ClientSession session,
final MongoClient mongoClient,
final MongoDatabase database,
final Mapper mapper,
final QueryFactory queryFactory) {
super(database, mongoClient, mapper, queryFactory);
this.session = session;
}
@Override
public DeleteResult delete(final DeleteOptions options) {
MongoCollection<T> collection = options.prepare(getCollection());
ClientSession session = datastore.findSession(options);
if (options.isMulti()) {
return session == null
? collection.deleteMany(getQueryDocument(), options)
: collection.deleteMany(session, getQueryDocument(), options);
} else {
return session == null
? collection.deleteOne(getQueryDocument(), options)
: collection.deleteOne(session, getQueryDocument(), options);
}
}
@Override
public T findAndDelete(final FindAndDeleteOptions options) {
MongoCollection<T> mongoCollection = options.prepare(getCollection());
ClientSession session = datastore.findSession(options);
return session == null
? mongoCollection.findOneAndDelete(getQueryDocument(), options)
: mongoCollection.findOneAndDelete(session, getQueryDocument(), options);
}
private <E> MongoCursor<E> prepareCursor(final FindOptions options, final MongoCollection<E> collection) {
final Document query = this.toDocument();
FindOptions findOptions = getOptions().copy().copy(options);
if (LOG.isTraceEnabled()) {
LOG.trace(format("Running query(%s) : %s, options: %s,", getCollectionName(), query, findOptions));
}
if ((findOptions.getCursorType() != null && findOptions.getCursorType() != NonTailable)
&& (findOptions.getSort() != null)) {
LOG.warn("Sorting on tail is not allowed.");
}
ClientSession clientSession = datastore.findSession(findOptions);
FindIterable<E> iterable = clientSession != null
? collection.find(clientSession, query)
: collection.find(query);
Document oldProfile = null;
if (findOptions.isLogQuery()) {
oldProfile = datastore.getDatabase().runCommand(new Document("profile", 2).append("slowms", 0));
}
try {
return findOptions
.apply(iterable, mapper, clazz)
.iterator();
} finally {
if (findOptions.isLogQuery()) {
datastore.getDatabase().runCommand(new Document("profile", oldProfile.get("was"))
.append("slowms", oldProfile.get("slowms"))
.append("sampleRate", oldProfile.get("sampleRate")));
}
}
}
@Override
public long count(final CountOptions options) {
ClientSession session = datastore.findSession(options);
Document query = getQueryDocument();
return session == null ? getCollection().countDocuments(query, options)
: getCollection().countDocuments(session, query, options);
}
@Override
public DeleteResult delete(final DeleteOptions options) {
MongoCollection<T> collection = options.prepare(getCollection());
ClientSession session = datastore.findSession(options);
if (options.isMulti()) {
return session == null
? collection.deleteMany(getQueryDocument(), options)
: collection.deleteMany(session, getQueryDocument(), options);
} else {
return session == null
? collection.deleteOne(getQueryDocument(), options)
: collection.deleteOne(session, getQueryDocument(), options);
}
}
@Override
public T findAndDelete(final FindAndDeleteOptions options) {
MongoCollection<T> mongoCollection = options.prepare(getCollection());
ClientSession session = datastore.findSession(options);
return session == null
? mongoCollection.findOneAndDelete(getQueryDocument(), options)
: mongoCollection.findOneAndDelete(session, getQueryDocument(), options);
}
@SuppressWarnings("ConstantConditions")
private <E> MongoCursor<E> prepareCursor(final FindOptions findOptions, final MongoCollection<E> collection) {
final Document query = toDocument();
if (LOG.isTraceEnabled()) {
LOG.trace(format("Running query(%s) : %s, options: %s,", getCollectionName(), query, findOptions));
}
if ((findOptions.getCursorType() != null && findOptions.getCursorType() != NonTailable)
&& (findOptions.getSort() != null)) {
LOG.warn("Sorting on tail is not allowed.");
}
ClientSession clientSession = datastore.findSession(findOptions);
MongoCollection<E> updated = findOptions.prepare(collection);
FindIterable<E> iterable = clientSession != null
? updated.find(clientSession, query)
: updated.find(query);
Document oldProfile = null;
if (findOptions.isLogQuery()) {
oldProfile = datastore.getDatabase().runCommand(new Document("profile", 2).append("slowms", 0));
}
try {
return findOptions
.apply(iterable, mapper, clazz)
.iterator();
} finally {
if (findOptions.isLogQuery()) {
datastore.getDatabase().runCommand(new Document("profile", oldProfile.get("was"))
.append("slowms", oldProfile.get("slowms"))
.append("sampleRate", oldProfile.get("sampleRate")));
}
}
}
/**
* Performs the operation
*
* @param options the options to apply
* @return the operation result
*/
public T execute(final ModifyOptions options) {
ClientSession session = getDatastore().findSession(options);
Document update = toDocument();
return session == null
? options.prepare(getCollection()).findOneAndUpdate(getQuery().toDocument(), update, options)
: options.prepare(getCollection()).findOneAndUpdate(session, getQuery().toDocument(), update, options);
}
protected <T> void insert(final MongoCollection collection, final T entity, final InsertOneOptions options) {
setInitialVersion(mapper.getMappedClass(entity.getClass()).getVersionField(), entity);
MongoCollection mongoCollection = mapper.enforceWriteConcern(collection, entity.getClass());
ClientSession clientSession = findSession(options);
if (clientSession == null) {
mongoCollection.insertOne(entity, options.getOptions());
} else {
mongoCollection.insertOne(clientSession, entity, options.getOptions());
}
}
@Override
public ClientSession getSession(ClientSessionOptions options) {
return this.mongoDbFactory.getSession(options);
}
@Override
public MongoDbFactory withSession(ClientSession session) {
return this.mongoDbFactory.withSession(session);
}
/**
* Taken from <a href="https://docs.mongodb.com/manual/core/transactions/">https://docs.mongodb.com</a>
*/
@Test
public void shouldExecuteTransactions() {
try (
// creatingMongoDBContainer {
final MongoDBContainer mongoDBContainer = new MongoDBContainer()
// }
) {
// startingMongoDBContainer {
mongoDBContainer.start();
// }
final String mongoRsUrl = mongoDBContainer.getReplicaSetUrl();
assertNotNull(mongoRsUrl);
final MongoClient mongoSyncClient = MongoClients.create(mongoRsUrl);
mongoSyncClient.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0));
mongoSyncClient.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0));
final ClientSession clientSession = mongoSyncClient.startSession();
final TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
final String trxResult = "Inserted into collections in different databases";
TransactionBody<String> txnBody = () -> {
final MongoCollection<Document> coll1 =
mongoSyncClient.getDatabase("mydb1").getCollection("foo");
final MongoCollection<Document> coll2 =
mongoSyncClient.getDatabase("mydb2").getCollection("bar");
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return trxResult;
};
try {
final String trxResultActual = clientSession.withTransaction(txnBody, txnOptions);
assertEquals(trxResult, trxResultActual);
} catch (RuntimeException re) {
throw new IllegalStateException(re.getMessage(), re);
} finally {
clientSession.close();
mongoSyncClient.close();
}
}
}
@Override
public void follow(User user, User toFollow) {
// Use the some edge _id for both edge collections
ObjectId edgeId = new ObjectId();
ClientSession clientSession = null;
int txn_retries = 0;
// if there are two collections, then we will be doing two inserts
// and we should wrap them in a transaction
if(config.transactions && config.maintain_following_collection && config.maintain_follower_collection) {
// establish session and start transaction
while (true) {
try {
clientSession = this.client.startSession();
clientSession.startTransaction();
insertEdgeWithId(this.followingMC, edgeId, user, toFollow, clientSession);
insertEdgeWithId(this.followersMC, edgeId, toFollow, user, clientSession);
clientSession.commitTransaction();
if (txn_retries > 0) System.out.println("Committed after " + txn_retries + " retries.");
return;
} catch (MongoCommandException e) {
System.err.println("Couldn't commit follow with " + e.getErrorCode());
if (e.getErrorCode() == 24) {
System.out.println("Lock Timeout... retrying transaction");
} else if (e.getErrorCode() == 11000) {
System.out.println("This is a duplicate edge, not retrying");
return;
} else if (e.getErrorCode() == 251) {
System.out.println("Transaction aborted due to duplicate edge, not retrying");
return;
} else if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
System.out.println("UnknownTransactionCommitResult... retrying transaction");
} else if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
System.out.println("TransientTransactionError, retrying transaction");
} else {
System.out.println("Some other error, retrying");
e.printStackTrace();
}
} finally {
clientSession.close();
txn_retries++; // maybe sleep a bit?
}
}
}
// create the "following" relationship
if(config.maintain_following_collection){
insertEdgeWithId(this.following, edgeId, user, toFollow);
}
// create the reverse "follower" relationship
if(config.maintain_follower_collection){
insertEdgeWithId(this.followers, edgeId, toFollow, user);
}
// if maintaining, update the following and follower
// counts of the two users respectively
if(config.store_follow_counts_with_user){
this.users.update(byUserId(user.getUserId()),
increment(FOLLOWING_COUNT_KEY));
this.users.update(byUserId(toFollow.getUserId()),
increment(FOLLOWER_COUNT_KEY));
}
}
@Override
public void unfollow(User user, User toRemove) {
ClientSession clientSession = null;
int txn_retries = 0;
// if there are two collections, then we will be doing two removes
// and we should wrap them in a transaction
if(config.transactions && config.maintain_following_collection && config.maintain_follower_collection) {
// establish session and start transaction
while (true) {
try {
clientSession = this.client.startSession();
clientSession.startTransaction();
this.followingMC.deleteOne(clientSession, new Document(makeEdge(user, toRemove).toMap()));
this.followersMC.deleteOne(clientSession, new Document(makeEdge(toRemove, user).toMap()));
clientSession.commitTransaction();
if (txn_retries > 0) System.out.println("Committed after " + txn_retries + " retries.");
return;
} catch (MongoCommandException e) {
System.err.println("Couldn't commit unfollow with " + e.getErrorCode());
if (e.getErrorCode() == 24) {
System.out.println("Lock Timeout... retrying transaction");
} else if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
System.out.println("UnknownTransactionCommitResult... retrying transaction");
} else if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
System.out.println("TransientTransactionError, retrying transaction");
} else {
System.out.println("Some other error with unfollow, retrying");
e.printStackTrace();
}
} finally {
clientSession.close();
txn_retries++; // maybe sleep a bit?
}
}
}
// remove the "following" relationship
if(config.maintain_following_collection){
this.following.remove(makeEdge(user, toRemove));
}
// remove the reverse "follower" relationship
if(config.maintain_follower_collection){
this.followers.remove(makeEdge(toRemove, user));
}
// if maintaining, update the following and follower
// counts of the two users respectively
if(config.store_follow_counts_with_user){
this.users.update(byUserId(user.getUserId()),
decrement(FOLLOWING_COUNT_KEY));
this.users.update(byUserId(toRemove.getUserId()),
decrement(FOLLOWER_COUNT_KEY));
}
}
@Override
public ClientSession getSession(ClientSessionOptions options) {
return dataSource.getClient().startSession();
}
@Override
public ClientSession clientSession() {
return clientSession;
}
/**
* @return the session
*/
public ClientSession getSession() {
return session;
}
@Override
public long count(final CountOptions options) {
ClientSession session = datastore.findSession(options);
return session == null ? getCollection().countDocuments(getQueryDocument(), options)
: getCollection().countDocuments(session, getQueryDocument(), options);
}
@Override
public ClientSession clientSession() {
return clientSession;
}
@Override
public ClientSession clientSession() {
return clientSession;
}
@Override
public ClientSession clientSession() {
return clientSession;
}
@Override
public ClientSession clientSession() {
return clientSession;
}
@Override
public ClientSession clientSession() {
return clientSession;
}
@Override
public ClientSession findSession(final SessionConfigurable<?> configurable) {
return configurable.clientSession() != null
? configurable.clientSession()
: getSession();
}
private <T> boolean tryVersionedUpdate(final T entity, final MongoCollection collection, final InsertOneOptions options) {
final MappedClass mc = mapper.getMappedClass(entity.getClass());
if (mc.getVersionField() == null) {
return false;
}
MappedField idField = mc.getIdField();
final Object idValue = idField.getFieldValue(entity);
final MappedField versionField = mc.getVersionField();
Long oldVersion = (Long) versionField.getFieldValue(entity);
long newVersion = oldVersion == null ? 1L : oldVersion + 1;
ClientSession session = findSession(options);
if (newVersion == 1) {
try {
updateVersion(entity, versionField, newVersion);
if (session == null) {
options.prepare(collection).insertOne(entity, options.getOptions());
} else {
options.prepare(collection).insertOne(session, entity, options.getOptions());
}
} catch (MongoWriteException e) {
updateVersion(entity, versionField, oldVersion);
throw new ConcurrentModificationException(Sofia.concurrentModification(entity.getClass().getName(), idValue));
}
} else if (idValue != null) {
final UpdateResult res = find(collection.getNamespace().getCollectionName())
.filter(eq("_id", idValue),
eq(versionField.getMappedFieldName(), oldVersion))
.update(UpdateOperators.set(entity))
.execute(new UpdateOptions()
.bypassDocumentValidation(options.getBypassDocumentValidation())
.clientSession(session)
.writeConcern(options.writeConcern()));
if (res.getModifiedCount() != 1) {
throw new ConcurrentModificationException(Sofia.concurrentModification(entity.getClass().getName(), idValue));
}
updateVersion(entity, versionField, newVersion);
}
return true;
}
@Override
public ClientSession clientSession() {
return clientSession;
}