下面列出了怎么用com.mongodb.client.ChangeStreamIterable的API类实例代码及写法,或者点击链接到github查看源代码。
private static <T, U> SupplierEx<StreamContext<T, U>> contextFn(
SupplierEx<? extends MongoClient> connectionSupplier,
FunctionEx<? super MongoClient, ? extends MongoDatabase> databaseFn,
FunctionEx<? super MongoDatabase, ? extends MongoCollection<? extends T>> collectionFn,
ConsumerEx<? super MongoClient> destroyFn,
FunctionEx<? super MongoCollection<? extends T>, ? extends ChangeStreamIterable<? extends T>> searchFn,
FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
return () -> {
MongoClient client = connectionSupplier.get();
MongoDatabase database = databaseFn.apply(client);
MongoCollection<? extends T> collection = collectionFn.apply(database);
ChangeStreamIterable<? extends T> changeStreamIterable = searchFn.apply(collection);
return new StreamContext<>(client, changeStreamIterable, mapFn, destroyFn, startAtOperationTimeFn);
};
}
private static <T, U> SupplierEx<StreamContext<T, U>> contextFn(
SupplierEx<? extends MongoClient> connectionSupplier,
FunctionEx<? super MongoClient, ? extends MongoDatabase> databaseFn,
ConsumerEx<? super MongoClient> destroyFn,
FunctionEx<? super MongoDatabase, ? extends ChangeStreamIterable<? extends T>> searchFn,
FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
return () -> {
MongoClient client = connectionSupplier.get();
MongoDatabase database = databaseFn.apply(client);
ChangeStreamIterable<? extends T> changeStreamIterable = searchFn.apply(database);
return new StreamContext<>(client, changeStreamIterable, mapFn, destroyFn, startAtOperationTimeFn);
};
}
private static <T, U> SupplierEx<StreamContext<T, U>> contextFn(
SupplierEx<? extends MongoClient> connectionSupplier,
ConsumerEx<? super MongoClient> destroyFn,
FunctionEx<? super MongoClient, ? extends ChangeStreamIterable<? extends T>> searchFn,
FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
return () -> {
MongoClient client = connectionSupplier.get();
ChangeStreamIterable<? extends T> changeStreamIterable = searchFn.apply(client);
return new StreamContext<>(client, changeStreamIterable, mapFn, destroyFn, startAtOperationTimeFn);
};
}
StreamContext(
MongoClient client,
ChangeStreamIterable<? extends T> changeStreamIterable,
FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
ConsumerEx<? super MongoClient> destroyFn,
FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
this.client = client;
this.changeStreamIterable = changeStreamIterable;
this.mapFn = mapFn;
this.destroyFn = destroyFn;
this.timestamp = startAtOperationTimeFn == null ? null : startAtOperationTimeFn.apply(client);
}
private StreamSource<? extends Document> streamSource(
Document filter,
Document projection,
int connectionTimeoutSeconds
) {
String connectionString = mongoContainer.connectionString();
long value = startAtOperationTime.getValue();
return MongoDBSourceBuilder
.stream(SOURCE_NAME, () -> mongoClient(connectionString, connectionTimeoutSeconds))
.databaseFn(client -> client.getDatabase(DB_NAME))
.collectionFn(db -> db.getCollection(COL_NAME))
.destroyFn(MongoClient::close)
.searchFn(col -> {
List<Bson> aggregates = new ArrayList<>();
if (filter != null) {
aggregates.add(Aggregates.match(filter));
}
if (projection != null) {
aggregates.add(Aggregates.project(projection));
}
ChangeStreamIterable<? extends Document> watch;
if (aggregates.isEmpty()) {
watch = col.watch();
} else {
watch = col.watch(aggregates);
}
return watch;
})
.mapFn(ChangeStreamDocument::getFullDocument)
.startAtOperationTimeFn(client -> new BsonTimestamp(value))
.build();
}
private MongoCursor<BsonDocument> tryCreateCursor(
final MongoSourceConfig sourceConfig,
final MongoClient mongoClient,
final BsonDocument resumeToken) {
try {
ChangeStreamIterable<Document> changeStreamIterable =
getChangeStreamIterable(sourceConfig, mongoClient);
if (resumeToken != null && supportsStartAfter) {
LOGGER.info("Resuming the change stream after the previous offset: {}", resumeToken);
changeStreamIterable.startAfter(resumeToken);
} else if (resumeToken != null && !invalidatedCursor) {
LOGGER.info("Resuming the change stream after the previous offset using resumeAfter.");
changeStreamIterable.resumeAfter(resumeToken);
} else {
LOGGER.info("New change stream cursor created without offset.");
}
return changeStreamIterable.withDocumentClass(BsonDocument.class).iterator();
} catch (MongoCommandException e) {
if (resumeToken != null) {
if (e.getErrorCode() == 260) {
invalidatedCursor = true;
return tryCreateCursor(sourceConfig, mongoClient, null);
} else if ((e.getErrorCode() == 9 || e.getErrorCode() == 40415)
&& e.getErrorMessage().contains("startAfter")) {
supportsStartAfter = false;
return tryCreateCursor(sourceConfig, mongoClient, resumeToken);
}
}
LOGGER.info("Failed to resume change stream: {} {}", e.getErrorMessage(), e.getErrorCode());
return null;
}
}
private ChangeStreamIterable<Document> getChangeStreamIterable(
final MongoSourceConfig sourceConfig, final MongoClient mongoClient) {
String database = sourceConfig.getString(DATABASE_CONFIG);
String collection = sourceConfig.getString(COLLECTION_CONFIG);
Optional<List<Document>> pipeline = sourceConfig.getPipeline();
ChangeStreamIterable<Document> changeStream;
if (database.isEmpty()) {
LOGGER.info("Watching all changes on the cluster");
changeStream = pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
} else if (collection.isEmpty()) {
LOGGER.info("Watching for database changes on '{}'", database);
MongoDatabase db = mongoClient.getDatabase(database);
changeStream = pipeline.map(db::watch).orElse(db.watch());
} else {
LOGGER.info("Watching for collection changes on '{}.{}'", database, collection);
MongoCollection<Document> coll = mongoClient.getDatabase(database).getCollection(collection);
changeStream = pipeline.map(coll::watch).orElse(coll.watch());
}
int batchSize = sourceConfig.getInt(BATCH_SIZE_CONFIG);
if (batchSize > 0) {
changeStream.batchSize(batchSize);
}
sourceConfig.getFullDocument().ifPresent(changeStream::fullDocument);
sourceConfig.getCollation().ifPresent(changeStream::collation);
return changeStream;
}
/**
* Returns a MongoDB stream source which watches the changes on the
* collection. The source applies the given {@code filter} and {@code
* projection} on the change stream documents.
* <p>
* Change stream is available for replica sets and sharded clusters that
* use WiredTiger storage engine and replica set protocol version 1 (pv1).
* Change streams can also be used on deployments which employ MongoDB's
* encryption-at-rest feature. You cannot watch on system collections and
* collections in admin, local and config databases.
* <p>
* See {@link MongoDBSourceBuilder} for creating custom MongoDB sources.
* <p>
* Here's an example which streams inserts on a collection having the
* field {@code age} with a value greater than {@code 10} and applies a
* projection so that only the {@code age} field is returned in the
* emitted document.
*
* <pre>{@code
* StreamSource<? extends Document> streamSource =
* MongoDBSources.stream(
* "stream-source",
* "mongodb://127.0.0.1:27017",
* "myDatabase",
* "myCollection",
* new Document("fullDocument.age", new Document("$gt", 10))
* .append("operationType", "insert"),
* new Document("fullDocument.age", 1)
* );
*
* Pipeline p = Pipeline.create();
* StreamSourceStage<? extends Document> srcStage = p.readFrom(streamSource);
* }</pre>
*
* @param name a descriptive name for the source (diagnostic purposes)
* @param connectionString a connection string URI to MongoDB for example:
* {@code mongodb://127.0.0.1:27017}
* @param database the name of the database
* @param collection the name of the collection
* @param filter filter object as a {@link Document}
* @param projection projection object as a {@link Document}
*/
@Nonnull
public static StreamSource<? extends Document> stream(
@Nonnull String name,
@Nonnull String connectionString,
@Nonnull String database,
@Nonnull String collection,
@Nullable Document filter,
@Nullable Document projection
) {
return MongoDBSourceBuilder
.stream(name, () -> MongoClients.create(connectionString))
.databaseFn(client -> client.getDatabase(database))
.collectionFn(db -> db.getCollection(collection))
.destroyFn(MongoClient::close)
.searchFn(
col -> {
List<Bson> aggregates = new ArrayList<>();
if (filter != null) {
aggregates.add(Aggregates.match(filter));
}
if (projection != null) {
aggregates.add(Aggregates.project(projection));
}
ChangeStreamIterable<? extends Document> watch;
if (aggregates.isEmpty()) {
watch = col.watch();
} else {
watch = col.watch(aggregates);
}
return watch;
}
)
.mapFn(ChangeStreamDocument::getFullDocument)
.build();
}