下面列出了怎么用com.mongodb.MongoNamespace的API类实例代码及写法,或者点击链接到github查看源代码。
MongoCopyDataManager(final MongoSourceConfig sourceConfig, final MongoClient mongoClient) {
this.sourceConfig = sourceConfig;
this.mongoClient = mongoClient;
String database = sourceConfig.getString(DATABASE_CONFIG);
String collection = sourceConfig.getString(COLLECTION_CONFIG);
List<MongoNamespace> namespaces;
if (database.isEmpty()) {
namespaces = getCollections(mongoClient);
} else if (collection.isEmpty()) {
namespaces = getCollections(mongoClient, database);
} else {
namespaces = singletonList(createNamespace(database, collection));
}
LOGGER.info("Copying existing data on the following namespaces: {}", namespaces);
namespacesToCopy = new AtomicInteger(namespaces.size());
queue = new ArrayBlockingQueue<>(sourceConfig.getInt(COPY_EXISTING_QUEUE_SIZE_CONFIG));
executor =
Executors.newFixedThreadPool(
Math.max(
1,
Math.min(
namespaces.size(), sourceConfig.getInt(COPY_EXISTING_MAX_THREADS_CONFIG))));
namespaces.forEach(n -> executor.submit(() -> copyDataFrom(n)));
}
/**
* Deletes a single synchronized document by its given id. No deletion will occur if the _id is
* not being synchronized.
*
* @param nsConfig the namespace synchronization config of the namespace where the document
* lives.
* @param documentId the _id of the document.
*/
@CheckReturnValue
@Nullable
private LocalSyncWriteModelContainer deleteOneFromRemote(
final NamespaceSynchronizationConfig nsConfig,
final BsonValue documentId
) {
final MongoNamespace namespace = nsConfig.getNamespace();
final Lock lock = this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
lock.lock();
final CoreDocumentSynchronizationConfig config;
try {
config = syncConfig.getSynchronizedDocument(namespace, documentId);
if (config == null) {
return null;
}
} finally {
lock.unlock();
}
final LocalSyncWriteModelContainer container = desyncDocumentsFromRemote(nsConfig, documentId);
container.addLocalChangeEvent(
ChangeEvents.changeEventForLocalDelete(namespace, documentId, false));
return container;
}
private <ResultT> SyncFindOperation<ResultT> createSyncFindOperation(
final MongoNamespace findNamespace,
final Bson filter,
final Class<ResultT> resultClass,
final RemoteFindOptions options
) {
final BsonDocument filterDoc = filter.toBsonDocument(documentClass, codecRegistry);
final BsonDocument projDoc = BsonUtils.toBsonDocumentOrNull(
options.getProjection(),
documentClass,
codecRegistry);
final BsonDocument sortDoc = BsonUtils.toBsonDocumentOrNull(
options.getSort(),
documentClass,
codecRegistry);
return new SyncFindOperation<>(
findNamespace,
resultClass,
dataSynchronizer)
.filter(filterDoc)
.limit(options.getLimit())
.projection(projDoc)
.sort(sortDoc);
}
static NamespaceSynchronizationConfig fromBsonDocument(final BsonDocument document) {
keyPresent(ConfigCodec.Fields.NAMESPACE_FIELD, document);
keyPresent(ConfigCodec.Fields.SCHEMA_VERSION_FIELD, document);
final int schemaVersion =
document.getNumber(ConfigCodec.Fields.SCHEMA_VERSION_FIELD).intValue();
if (schemaVersion != 1) {
throw new IllegalStateException(
String.format(
"unexpected schema version '%d' for %s",
schemaVersion,
CoreDocumentSynchronizationConfig.class.getSimpleName()));
}
return new NamespaceSynchronizationConfig(
new MongoNamespace(document.getString(ConfigCodec.Fields.NAMESPACE_FIELD).getValue()));
}
/**
* Constructs a change event.
*
* @param id The id of the change event.
* @param operationType The operation type represented by the change event.
* @param fullDocument The full document at some point after the change is applied.
* @param ns The namespace (database and collection) of the document.
* @param documentKey The id if the underlying document that changed.
* @param updateDescription The description of what has changed (for updates only).
* @param hasUncommittedWrites Whether this represents a local uncommitted write.
*/
public ChangeEvent(
final BsonDocument id,
final OperationType operationType,
final DocumentT fullDocument,
final MongoNamespace ns,
final BsonDocument documentKey,
final UpdateDescription updateDescription,
final boolean hasUncommittedWrites
) {
super(
operationType, fullDocument, documentKey, updateDescription, hasUncommittedWrites
);
this.id = id;
this.ns = ns;
}
/**
* Construct a new instance.
*
* @param namespace the database and collection namespace for the operation.
* @param methodName the name of the findOneAndModify function to run.
* @param filter the filter to query for the document.
* @param update the update to apply to the resulting document.
* @param project the projection operation to apply to the returned document.
* @param sort the sort to use on the query before selecting the first document.
* @param decoder the decoder for the result documents.Operations.java
*
*/
FindOneAndModifyOperation(
final MongoNamespace namespace,
final String methodName,
final BsonDocument filter,
final BsonDocument update,
final BsonDocument project,
final BsonDocument sort,
final Decoder<T> decoder) {
notNull("namespace", namespace);
notNull("methodName", methodName);
notNull("filter", filter);
notNull("update", update);
notNull("decoder", decoder);
this.namespace = namespace;
this.methodName = methodName;
this.filter = filter;
this.update = update;
this.project = project;
this.sort = sort;
this.decoder = decoder;
}
NamespaceChangeStreamListener(
final MongoNamespace namespace,
final NamespaceSynchronizationConfig nsConfig,
final CoreStitchServiceClient service,
final NetworkMonitor networkMonitor,
final AuthMonitor authMonitor,
final ReadWriteLock nsLock
) {
this.namespace = namespace;
this.nsConfig = nsConfig;
this.service = service;
this.networkMonitor = networkMonitor;
this.authMonitor = authMonitor;
this.events = new HashMap<>();
this.nsLock = nsLock;
this.logger =
Loggers.getLogger(
String.format("NamespaceChangeStreamListener-%s", namespace.toString()));
this.watchers = new HashSet<>();
}
/**
* Returns the latest change events for a given namespace.
*
* @param namespace the namespace to get events for.
* @return the latest change events for a given namespace.
*/
public Map<BsonValue, CompactChangeEvent<BsonDocument>> getEventsForNamespace(
final MongoNamespace namespace
) {
this.instanceLock.readLock().lock();
final NamespaceChangeStreamListener streamer;
try {
streamer = nsStreamers.get(namespace);
} finally {
this.instanceLock.readLock().unlock();
}
if (streamer == null) {
return new HashMap<>();
}
return streamer.getEvents();
}
public void configure(@Nonnull final MongoNamespace namespace,
@Nonnull final SyncConfiguration syncConfiguration) {
this.waitUntilInitialized();
final NamespaceSynchronizationConfig nsConfig = this.syncConfig.getNamespaceConfig(namespace);
final SyncFrequency prevFrequency = nsConfig.getSyncFrequency() != null
? nsConfig.getSyncFrequency()
: SyncFrequency.onDemand(); // serves as default prev freq since it has no streams or timers
nsConfig.configure(syncConfiguration);
syncLock.lock();
try {
this.exceptionListener = syncConfiguration.getExceptionListener();
this.isConfigured = true;
this.configureSyncFrequency(namespace, syncConfiguration.getSyncFrequency(), prevFrequency);
} finally {
syncLock.unlock();
}
if (!isRunning) {
this.start();
}
}
/**
* Generates a change event for a local replacement of a document in the given namespace referring
* to the given document _id.
*
* @param namespace the namespace where the document was inserted.
* @param documentId the _id of the document that was updated.
* @param document the replacement document.
* @return a change event for a local replacement of a document in the given namespace referring
* to the given document _id.
*/
static ChangeEvent<BsonDocument> changeEventForLocalReplace(
final MongoNamespace namespace,
final BsonValue documentId,
final BsonDocument document,
final boolean writePending
) {
return new ChangeEvent<>(
new BsonDocument(),
OperationType.REPLACE,
document,
namespace,
new BsonDocument("_id", documentId),
null,
writePending);
}
/**
* Requests that a document be synchronized by the given _id. Actual synchronization of the
* document will happen later in a {@link DataSynchronizer#doSyncPass()} iteration.
*
* @param namespace the namespace to put the document in.
* @param documentIds the _ids of the documents.
*/
public void syncDocumentsFromRemote(
final MongoNamespace namespace,
final BsonValue... documentIds
) {
this.waitUntilInitialized();
try {
ongoingOperationsGroup.enter();
if (syncConfig.addSynchronizedDocuments(namespace, documentIds)) {
checkAndInsertNamespaceListener(namespace);
}
} finally {
ongoingOperationsGroup.exit();
}
}
public <T> T findOne(
final MongoNamespace namespace,
final BsonDocument filter,
final BsonDocument projection,
final BsonDocument sort,
final Class<T> resultClass,
final CodecRegistry codecRegistry
) {
this.waitUntilInitialized();
ongoingOperationsGroup.enter();
final Lock lock = this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
lock.lock();
try {
return getLocalCollection(namespace, resultClass, codecRegistry)
.find(filter)
.limit(1)
.projection(projection)
.sort(sort)
.first();
} finally {
lock.unlock();
ongoingOperationsGroup.exit();
}
}
public <T> Collection<T> find(
final MongoNamespace namespace,
final BsonDocument filter,
final int limit,
final BsonDocument projection,
final BsonDocument sort,
final Class<T> resultClass,
final CodecRegistry codecRegistry
) {
this.waitUntilInitialized();
ongoingOperationsGroup.enter();
try {
return getLocalCollection(namespace, resultClass, codecRegistry)
.find(filter)
.limit(limit)
.projection(projection)
.sort(sort)
.into(new ArrayList<>());
} finally {
ongoingOperationsGroup.exit();
}
}
/**
* If there is an unprocessed change event for a particular document ID, fetch it from the
* appropriate namespace change stream listener, and remove it. By reading the event here, we are
* assuming it will be processed by the consumer.
*
* @return the latest unprocessed change event for the given document ID and namespace, or null
* if none exists.
*/
public @Nullable CompactChangeEvent<BsonDocument> getUnprocessedEventForDocumentId(
final MongoNamespace namespace,
final BsonValue documentId
) {
this.instanceLock.readLock().lock();
final NamespaceChangeStreamListener streamer;
try {
streamer = nsStreamers.get(namespace);
} finally {
this.instanceLock.readLock().unlock();
}
if (streamer == null) {
return null;
}
return streamer.getUnprocessedEventForDocumentId(documentId);
}
/**
* Generates a change event for a local deletion of a document in the given namespace referring
* to the given document _id.
*
* @param namespace the namespace where the document was deleted.
* @param documentId the _id of the document that was deleted.
* @return a change event for a local deletion of a document in the given namespace referring
* to the given document _id.
*/
static ChangeEvent<BsonDocument> changeEventForLocalDelete(
final MongoNamespace namespace,
final BsonValue documentId,
final boolean writePending
) {
return new ChangeEvent<>(
new BsonDocument(),
OperationType.DELETE,
null,
namespace,
new BsonDocument("_id", documentId),
null,
writePending);
}
CoreDocumentSynchronizationConfig(
final MongoCollection<CoreDocumentSynchronizationConfig> docsColl,
final MongoNamespace namespace,
final BsonValue documentId
) {
this(docsColl, namespace, documentId, null, -1, null, new ReentrantReadWriteLock(),
false, false, 0L);
}
@Override
public void addWatcher(final MongoNamespace namespace,
final Callback<CompactChangeEvent<BsonDocument>, Object> watcher) {
if (nsStreamers.containsKey(namespace)) {
nsStreamers.get(namespace).addWatcher(watcher);
}
}
/**
* Returns the set of synchronized namespaces.
*
* @return the set of synchronized namespaces.
*/
public Set<MongoNamespace> getSynchronizedNamespaces() {
instanceLock.readLock().lock();
try {
return new HashSet<>(namespaces.keySet());
} finally {
instanceLock.readLock().unlock();
}
}
public CoreDocumentSynchronizationConfig addAndGetSynchronizedDocument(
final MongoNamespace namespace,
final BsonValue documentId
) {
final NamespaceSynchronizationConfig nsConfig = getNamespaceConfig(namespace);
nsConfig.addSynchronizedDocument(documentId);
return nsConfig.getSynchronizedDocument(documentId);
}
@Test
void renameCollection() {
String original = randomAlphaString(8);
String newName = randomAlphaString(8);
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
database.createCollection(original).await().indefinitely();
assertThat(database.listCollectionNames().collectItems().asList().await().indefinitely()).contains(original);
ReactiveMongoCollection<Document> collection = database.getCollection(original);
collection.renameCollection(new MongoNamespace(DATABASE, newName)).await().indefinitely();
assertThat(database.listCollectionNames().collectItems().asList().await().indefinitely()).contains(newName)
.doesNotContain(original);
}
/**
* Requests that the given namespace stopped being listened to for change events.
*
* @param namespace the namespace to stop listening for change events on.
*/
@Override
public void removeNamespace(final MongoNamespace namespace) {
this.instanceLock.writeLock().lock();
try {
if (!this.nsStreamers.containsKey(namespace)) {
return;
}
final NamespaceChangeStreamListener streamer = this.nsStreamers.get(namespace);
streamer.stop();
this.nsStreamers.remove(namespace);
} finally {
this.instanceLock.writeLock().unlock();
}
}
public boolean isOpen(final MongoNamespace namespace) {
instanceLock.writeLock().lock();
try {
if (nsStreamers.containsKey(namespace)) {
return nsStreamers.get(namespace).isOpen();
}
} finally {
instanceLock.writeLock().unlock();
}
return false;
}
public void start(final MongoNamespace namespace) {
instanceLock.writeLock().lock();
try {
if (nsStreamers.containsKey(namespace)) {
nsStreamers.get(namespace).start();
}
} finally {
instanceLock.writeLock().unlock();
}
}
/**
* Construct a new instance.
*
* @param namespace the database and collection namespace for the operation.
* @param pipeline the aggregation pipeline.
* @param decoder the decoder for the result documents.
*/
AggregateOperation(
final MongoNamespace namespace,
final List<BsonDocument> pipeline,
final Decoder<T> decoder
) {
notNull("namespace", namespace);
notNull("pipeline", pipeline);
notNull("decoder", decoder);
this.namespace = namespace;
this.pipeline = pipeline;
this.decoder = decoder;
}
private LocalSyncWriteModelContainer newWriteModelContainer(
final NamespaceSynchronizationConfig nsConfig
) {
final MongoNamespace namespace = nsConfig.getNamespace();
return new LocalSyncWriteModelContainer(
nsConfig,
getLocalCollection(namespace),
getRemoteCollection(namespace),
getUndoCollection(namespace),
eventDispatcher
);
}
public boolean hasNamespace(final MongoNamespace namespace) {
instanceLock.readLock().lock();
try {
return nsStreamers.containsKey(namespace);
} finally {
instanceLock.readLock().unlock();
}
}
static BsonDocument getNsFilter(
final MongoNamespace namespace
) {
final BsonDocument filter = new BsonDocument();
filter.put(ConfigCodec.Fields.NAMESPACE_FIELD, new BsonString(namespace.toString()));
return filter;
}
/**
* Returns the set of synchronized documents _ids in a namespace.
*
* @param namespace the namespace to get synchronized documents _ids for.
* @return the set of synchronized documents _ids in a namespace.
*/
public Set<BsonValue> getSynchronizedDocumentIds(final MongoNamespace namespace) {
try {
return getNamespaceConfig(namespace).getSynchronizedDocumentIds();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return new HashSet<>();
}
}
InsertOneOperation(
final MongoNamespace namespace,
final BsonDocument document
) {
this.namespace = namespace;
this.document = document;
}
/**
* Returns the local collection representing the given namespace for raw document operations.
*
* @param namespace the namespace referring to the local collection.
* @return the local collection representing the given namespace for raw document operations.
*/
MongoCollection<BsonDocument> getLocalCollection(final MongoNamespace namespace) {
return getLocalCollection(
namespace,
BsonDocument.class,
MongoClientSettings.getDefaultCodecRegistry());
}