下面列出了怎么用com.mongodb.client.model.changestream.ChangeStreamDocument的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* This method also is responsible for caching the {@code resumeAfter} value for the change
* stream.
*/
private void setCachedResultAndResumeToken() {
MongoChangeStreamCursor<ChangeStreamDocument<Document>> changeStreamCursor =
getChangeStreamIterable(sourceConfig, mongoClient).cursor();
ChangeStreamDocument<Document> firstResult = changeStreamCursor.tryNext();
if (firstResult != null) {
cachedResult =
new BsonDocumentWrapper<>(
firstResult,
ChangeStreamDocument.createCodec(
Document.class, MongoClientSettings.getDefaultCodecRegistry()));
}
cachedResumeToken =
firstResult != null ? firstResult.getResumeToken() : changeStreamCursor.getResumeToken();
changeStreamCursor.close();
}
StreamContext(
MongoClient client,
ChangeStreamIterable<? extends T> changeStreamIterable,
FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
ConsumerEx<? super MongoClient> destroyFn,
FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
this.client = client;
this.changeStreamIterable = changeStreamIterable;
this.mapFn = mapFn;
this.destroyFn = destroyFn;
this.timestamp = startAtOperationTimeFn == null ? null : startAtOperationTimeFn.apply(client);
}
private StreamSource<? extends Document> streamSource(
Document filter,
Document projection,
int connectionTimeoutSeconds
) {
String connectionString = mongoContainer.connectionString();
long value = startAtOperationTime.getValue();
return MongoDBSourceBuilder
.stream(SOURCE_NAME, () -> mongoClient(connectionString, connectionTimeoutSeconds))
.databaseFn(client -> client.getDatabase(DB_NAME))
.collectionFn(db -> db.getCollection(COL_NAME))
.destroyFn(MongoClient::close)
.searchFn(col -> {
List<Bson> aggregates = new ArrayList<>();
if (filter != null) {
aggregates.add(Aggregates.match(filter));
}
if (projection != null) {
aggregates.add(Aggregates.project(projection));
}
ChangeStreamIterable<? extends Document> watch;
if (aggregates.isEmpty()) {
watch = col.watch();
} else {
watch = col.watch(aggregates);
}
return watch;
})
.mapFn(ChangeStreamDocument::getFullDocument)
.startAtOperationTimeFn(client -> new BsonTimestamp(value))
.build();
}
@Test
public void testWriteBadMessage() throws Exception {
source.open(map, mockSourceContext);
subscriber.onNext(new ChangeStreamDocument<Document>(null, new MongoNamespace("hello.pulsar"),
new Document("hello", "pulsar"), new BsonDocument("_id", new BsonString("id")), OperationType.INSERT, null));
Record<byte[]> record = source.read();
assertEquals(new String(record.getValue()),
"{\"fullDocument\":{\"hello\":\"pulsar\"},"
+ "\"ns\":{\"databaseName\":\"hello\",\"collectionName\":\"pulsar\",\"fullName\":\"hello.pulsar\"},"
+ "\"operation\":\"INSERT\"}");
}
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(ClientSession clientSession, Class<D> clazz,
ChangeStreamOptions options) {
return Wrappers.toMulti(apply(options, collection.watch(clientSession, clazz)));
}
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(List<? extends Bson> pipeline, Class<D> clazz,
ChangeStreamOptions options) {
return Wrappers.toMulti(apply(options, collection.watch(pipeline, clazz)));
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline) {
return Wrappers.toMulti(client.watch(pipeline));
}
@Override
public Multi<ChangeStreamDocument<Document>> watch() {
return Wrappers.toMulti(client.watch());
}
private SyncData fromChangeStream(ChangeStreamDocument<Document> d, MongoDataId dataId) {
MongoNamespace namespace = d.getNamespace();
HashMap<String, Object> full = new HashMap<>(), updated = null;
SimpleEventType type;
switch (d.getOperationType()) {
case UPDATE:
// e.g. members.6.state -> {[email protected]} "BsonInt32{value=1}"
// e.g. addToSet -> all elements in bson array
type = SimpleEventType.UPDATE;
assert d.getUpdateDescription() != null;
UpdateDescription updateDescription = d.getUpdateDescription();
updated = new HashMap<>(getUpdatedFields(d.getFullDocument(), updateDescription.getUpdatedFields(), bsonConversion));
if (d.getFullDocument() != null) {
full.putAll(getFullDocument(d));
// use UpdateDescription to overrides latest version
} else {
full.put(ID, getId(d));
}
full.putAll(updated);
addRemovedFields(updated, d.getUpdateDescription().getRemovedFields());
break;
case DELETE:
type = SimpleEventType.DELETE;
full.put(ID, getId(d));
break;
case INSERT:
case REPLACE: // write will overwrite for ES, not suitable for other output
type = SimpleEventType.WRITE;
full.putAll(getFullDocument(d));
break;
case OTHER:
case INVALIDATE:
case RENAME:
case DROP:
case DROP_DATABASE:
default:
return null;
}
return new SyncData(dataId, type, namespace.getDatabaseName(), namespace.getCollectionName(), ID, full.get(ID), new NamedChangeStream(full, updated));
}
@Test
public void testStream_whenWatchDatabase() {
IList<Document> list = jet.getList("list");
String connectionString = mongoContainer.connectionString();
long value = startAtOperationTime.getValue();
StreamSource<? extends Document> source = MongoDBSourceBuilder
.streamDatabase(SOURCE_NAME, () -> MongoClients.create(connectionString))
.databaseFn(client -> client.getDatabase(DB_NAME))
.destroyFn(MongoClient::close)
.searchFn(db -> {
List<Bson> aggregates = new ArrayList<>();
aggregates.add(Aggregates.match(new Document("fullDocument.val", new Document("$gte", 10))
.append("operationType", "insert")));
aggregates.add(Aggregates.project(new Document("fullDocument.val", 1).append("_id", 1)));
return db.watch(aggregates);
})
.mapFn(ChangeStreamDocument::getFullDocument)
.startAtOperationTimeFn(client -> new BsonTimestamp(value))
.build();
Pipeline p = Pipeline.create();
p.readFrom(source)
.withNativeTimestamps(0)
.writeTo(Sinks.list(list));
Job job = jet.newJob(p);
MongoCollection<Document> col1 = collection("col1");
MongoCollection<Document> col2 = collection("col2");
col1.insertOne(new Document("val", 1));
col1.insertOne(new Document("val", 10).append("foo", "bar"));
col2.insertOne(new Document("val", 2));
col2.insertOne(new Document("val", 11).append("foo", "bar"));
assertTrueEventually(() -> {
assertEquals(2, list.size());
list.forEach(document -> assertNull(document.get("foo")));
assertEquals(10, list.get(0).get("val"));
assertEquals(11, list.get(1).get("val"));
});
col1.insertOne(new Document("val", 3));
col1.insertOne(new Document("val", 12).append("foo", "bar"));
col2.insertOne(new Document("val", 4));
col2.insertOne(new Document("val", 13).append("foo", "bar"));
assertTrueEventually(() -> {
assertEquals(4, list.size());
list.forEach(document -> assertNull(document.get("foo")));
assertEquals(12, list.get(2).get("val"));
assertEquals(13, list.get(3).get("val"));
});
job.cancel();
}
@Test
public void testStream_whenWatchAll() {
IList<Document> list = jet.getList("list");
String connectionString = mongoContainer.connectionString();
long value = startAtOperationTime.getValue();
StreamSource<? extends Document> source = MongoDBSourceBuilder
.streamAll(SOURCE_NAME, () -> MongoClients.create(connectionString))
.destroyFn(MongoClient::close)
.searchFn(client -> {
List<Bson> aggregates = new ArrayList<>();
aggregates.add(Aggregates.match(new Document("fullDocument.val", new Document("$gt", 10))
.append("operationType", "insert")));
aggregates.add(Aggregates.project(new Document("fullDocument.val", 1).append("_id", 1)));
return client.watch(aggregates);
})
.mapFn(ChangeStreamDocument::getFullDocument)
.startAtOperationTimeFn(client -> new BsonTimestamp(value))
.build();
Pipeline p = Pipeline.create();
p.readFrom(source)
.withNativeTimestamps(0)
.writeTo(Sinks.list(list));
Job job = jet.newJob(p);
MongoCollection<Document> col1 = collection("db1", "col1");
MongoCollection<Document> col2 = collection("db1", "col2");
MongoCollection<Document> col3 = collection("db2", "col3");
col1.insertOne(new Document("val", 1));
col1.insertOne(new Document("val", 11).append("foo", "bar"));
col2.insertOne(new Document("val", 2));
col2.insertOne(new Document("val", 12).append("foo", "bar"));
col3.insertOne(new Document("val", 3));
col3.insertOne(new Document("val", 13).append("foo", "bar"));
assertTrueEventually(() -> {
assertEquals(3, list.size());
list.forEach(document -> assertNull(document.get("foo")));
assertEquals(11, list.get(0).get("val"));
assertEquals(12, list.get(1).get("val"));
assertEquals(13, list.get(2).get("val"));
});
col1.insertOne(new Document("val", 4));
col1.insertOne(new Document("val", 14).append("foo", "bar"));
col2.insertOne(new Document("val", 5));
col2.insertOne(new Document("val", 15).append("foo", "bar"));
col2.insertOne(new Document("val", 6));
col2.insertOne(new Document("val", 16).append("foo", "bar"));
assertTrueEventually(() -> {
assertEquals(6, list.size());
list.forEach(document -> assertNull(document.get("foo")));
assertEquals(14, list.get(3).get("val"));
assertEquals(15, list.get(4).get("val"));
assertEquals(16, list.get(5).get("val"));
});
job.cancel();
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline, ChangeStreamOptions options) {
ChangeStreamPublisher<Document> publisher = apply(options, client.watch(pipeline));
return Wrappers.toMulti(publisher);
}
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(ClientSession clientSession, List<? extends Bson> pipeline,
Class<D> clazz) {
return Wrappers.toMulti(collection.watch(clientSession, pipeline, clazz));
}
static Object getId(ChangeStreamDocument<Document> d) {
BsonDocument documentKey = d.getDocumentKey();
BsonValue o = documentKey.get(ID);
return MongoTypeUtil.convertBson(o);
}
private Map getFullDocument(ChangeStreamDocument<Document> d) {
return (Map) MongoTypeUtil.convertBsonTypes(d.getFullDocument());
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(List<? extends Bson> pipeline, Class<T> clazz) {
return Wrappers.toMulti(client.watch(pipeline, clazz));
}
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(ClientSession clientSession, List<? extends Bson> pipeline,
Class<D> clazz, ChangeStreamOptions options) {
return Wrappers.toMulti(apply(options, collection.watch(clientSession, pipeline, clazz)));
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(ClientSession clientSession, ChangeStreamOptions options) {
return Wrappers.toMulti(apply(options, collection.watch(clientSession)));
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline, ChangeStreamOptions options) {
return Wrappers.toMulti(apply(options, collection.watch(pipeline)));
}
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(Class<D> clazz, ChangeStreamOptions options) {
return Wrappers.toMulti(apply(options, collection.watch(clazz)));
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(List<? extends Bson> pipeline, Class<T> clazz,
ChangeStreamOptions options) {
ChangeStreamPublisher<T> publisher = apply(options, client.watch(pipeline, clazz));
return Wrappers.toMulti(publisher);
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(ClientSession clientSession, List<? extends Bson> pipeline,
ChangeStreamOptions options) {
ChangeStreamPublisher<Document> publisher = apply(options, client.watch(clientSession, pipeline));
return Wrappers.toMulti(publisher);
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(ClientSession clientSession, Class<T> clazz) {
return Wrappers.toMulti(client.watch(clientSession, clazz));
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(ClientSession clientSession, ChangeStreamOptions options) {
ChangeStreamPublisher<Document> publisher = apply(options, client.watch(clientSession));
return Wrappers.toMulti(publisher);
}
@Override
public Multi<ChangeStreamDocument<Document>> watch() {
return Wrappers.toMulti(database.watch());
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(ChangeStreamOptions options) {
return null;
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(Class<T> clazz) {
return Wrappers.toMulti(database.watch(clazz));
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(Class<T> clazz, ChangeStreamOptions options) {
return null;
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline) {
return Wrappers.toMulti(database.watch(pipeline));
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline, ChangeStreamOptions options) {
return null;
}