类com.mongodb.client.model.changestream.ChangeStreamDocument源码实例Demo

下面列出了怎么用com.mongodb.client.model.changestream.ChangeStreamDocument的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mongo-kafka   文件: MongoSourceTask.java
/**
 * This method also is responsible for caching the {@code resumeAfter} value for the change
 * stream.
 */
private void setCachedResultAndResumeToken() {
  MongoChangeStreamCursor<ChangeStreamDocument<Document>> changeStreamCursor =
      getChangeStreamIterable(sourceConfig, mongoClient).cursor();
  ChangeStreamDocument<Document> firstResult = changeStreamCursor.tryNext();
  if (firstResult != null) {
    cachedResult =
        new BsonDocumentWrapper<>(
            firstResult,
            ChangeStreamDocument.createCodec(
                Document.class, MongoClientSettings.getDefaultCodecRegistry()));
  }
  cachedResumeToken =
      firstResult != null ? firstResult.getResumeToken() : changeStreamCursor.getResumeToken();
  changeStreamCursor.close();
}
 
StreamContext(
        MongoClient client,
        ChangeStreamIterable<? extends T> changeStreamIterable,
        FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
        ConsumerEx<? super MongoClient> destroyFn,
        FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
    this.client = client;
    this.changeStreamIterable = changeStreamIterable;
    this.mapFn = mapFn;
    this.destroyFn = destroyFn;

    this.timestamp = startAtOperationTimeFn == null ? null : startAtOperationTimeFn.apply(client);
}
 
源代码3 项目: hazelcast-jet-contrib   文件: MongoDBSourceTest.java
private StreamSource<? extends Document> streamSource(
        Document filter,
        Document projection,
        int connectionTimeoutSeconds
) {
    String connectionString = mongoContainer.connectionString();
    long value = startAtOperationTime.getValue();
    return MongoDBSourceBuilder
            .stream(SOURCE_NAME, () -> mongoClient(connectionString, connectionTimeoutSeconds))
            .databaseFn(client -> client.getDatabase(DB_NAME))
            .collectionFn(db -> db.getCollection(COL_NAME))
            .destroyFn(MongoClient::close)
            .searchFn(col -> {
                List<Bson> aggregates = new ArrayList<>();
                if (filter != null) {
                    aggregates.add(Aggregates.match(filter));
                }
                if (projection != null) {
                    aggregates.add(Aggregates.project(projection));
                }
                ChangeStreamIterable<? extends Document> watch;
                if (aggregates.isEmpty()) {
                    watch = col.watch();
                } else {
                    watch = col.watch(aggregates);
                }
                return watch;
            })
            .mapFn(ChangeStreamDocument::getFullDocument)
            .startAtOperationTimeFn(client -> new BsonTimestamp(value))
            .build();
}
 
源代码4 项目: pulsar   文件: MongoSourceTest.java
@Test
public void testWriteBadMessage() throws Exception {

    source.open(map, mockSourceContext);

    subscriber.onNext(new ChangeStreamDocument<Document>(null, new MongoNamespace("hello.pulsar"),
            new Document("hello", "pulsar"), new BsonDocument("_id", new BsonString("id")), OperationType.INSERT, null));

    Record<byte[]> record = source.read();

    assertEquals(new String(record.getValue()),
            "{\"fullDocument\":{\"hello\":\"pulsar\"},"
            + "\"ns\":{\"databaseName\":\"hello\",\"collectionName\":\"pulsar\",\"fullName\":\"hello.pulsar\"},"
            + "\"operation\":\"INSERT\"}");
}
 
源代码5 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(ClientSession clientSession, Class<D> clazz,
        ChangeStreamOptions options) {
    return Wrappers.toMulti(apply(options, collection.watch(clientSession, clazz)));
}
 
源代码6 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(List<? extends Bson> pipeline, Class<D> clazz,
        ChangeStreamOptions options) {
    return Wrappers.toMulti(apply(options, collection.watch(pipeline, clazz)));
}
 
源代码7 项目: quarkus   文件: ReactiveMongoClientImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline) {
    return Wrappers.toMulti(client.watch(pipeline));
}
 
源代码8 项目: quarkus   文件: ReactiveMongoClientImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch() {
    return Wrappers.toMulti(client.watch());
}
 
源代码9 项目: syncer   文件: MongoV4MasterConnector.java
private SyncData fromChangeStream(ChangeStreamDocument<Document> d, MongoDataId dataId) {
  MongoNamespace namespace = d.getNamespace();
  HashMap<String, Object> full = new HashMap<>(), updated = null;
  SimpleEventType type;
  switch (d.getOperationType()) {
    case UPDATE:
      // e.g. members.6.state -> {[email protected]} "BsonInt32{value=1}"
      // e.g. addToSet  -> all elements in bson array
      type = SimpleEventType.UPDATE;
      assert d.getUpdateDescription() != null;

      UpdateDescription updateDescription = d.getUpdateDescription();
      updated = new HashMap<>(getUpdatedFields(d.getFullDocument(), updateDescription.getUpdatedFields(), bsonConversion));
      if (d.getFullDocument() != null) {
        full.putAll(getFullDocument(d));
        // use UpdateDescription to overrides latest version
      } else {
        full.put(ID, getId(d));
      }
      full.putAll(updated);
      addRemovedFields(updated, d.getUpdateDescription().getRemovedFields());
      break;
    case DELETE:
      type = SimpleEventType.DELETE;
      full.put(ID, getId(d));
      break;
    case INSERT:
    case REPLACE: // write will overwrite for ES, not suitable for other output
      type = SimpleEventType.WRITE;
      full.putAll(getFullDocument(d));
      break;
    case OTHER:
    case INVALIDATE:
    case RENAME:
    case DROP:
    case DROP_DATABASE:
    default:
      return null;
  }
  return new SyncData(dataId, type, namespace.getDatabaseName(), namespace.getCollectionName(), ID, full.get(ID), new NamedChangeStream(full, updated));
}
 
源代码10 项目: hazelcast-jet-contrib   文件: MongoDBSourceTest.java
@Test
public void testStream_whenWatchDatabase() {
    IList<Document> list = jet.getList("list");

    String connectionString = mongoContainer.connectionString();
    long value = startAtOperationTime.getValue();

    StreamSource<? extends Document> source = MongoDBSourceBuilder
            .streamDatabase(SOURCE_NAME, () -> MongoClients.create(connectionString))
            .databaseFn(client -> client.getDatabase(DB_NAME))
            .destroyFn(MongoClient::close)
            .searchFn(db -> {
                List<Bson> aggregates = new ArrayList<>();
                aggregates.add(Aggregates.match(new Document("fullDocument.val", new Document("$gte", 10))
                        .append("operationType", "insert")));

                aggregates.add(Aggregates.project(new Document("fullDocument.val", 1).append("_id", 1)));
                return db.watch(aggregates);
            })
            .mapFn(ChangeStreamDocument::getFullDocument)
            .startAtOperationTimeFn(client -> new BsonTimestamp(value))
            .build();


    Pipeline p = Pipeline.create();
    p.readFrom(source)
     .withNativeTimestamps(0)
     .writeTo(Sinks.list(list));

    Job job = jet.newJob(p);

    MongoCollection<Document> col1 = collection("col1");
    MongoCollection<Document> col2 = collection("col2");

    col1.insertOne(new Document("val", 1));
    col1.insertOne(new Document("val", 10).append("foo", "bar"));

    col2.insertOne(new Document("val", 2));
    col2.insertOne(new Document("val", 11).append("foo", "bar"));

    assertTrueEventually(() -> {
        assertEquals(2, list.size());
        list.forEach(document -> assertNull(document.get("foo")));

        assertEquals(10, list.get(0).get("val"));
        assertEquals(11, list.get(1).get("val"));

    });

    col1.insertOne(new Document("val", 3));
    col1.insertOne(new Document("val", 12).append("foo", "bar"));

    col2.insertOne(new Document("val", 4));
    col2.insertOne(new Document("val", 13).append("foo", "bar"));

    assertTrueEventually(() -> {
        assertEquals(4, list.size());
        list.forEach(document -> assertNull(document.get("foo")));

        assertEquals(12, list.get(2).get("val"));
        assertEquals(13, list.get(3).get("val"));
    });

    job.cancel();

}
 
源代码11 项目: hazelcast-jet-contrib   文件: MongoDBSourceTest.java
@Test
public void testStream_whenWatchAll() {
    IList<Document> list = jet.getList("list");

    String connectionString = mongoContainer.connectionString();
    long value = startAtOperationTime.getValue();

    StreamSource<? extends Document> source = MongoDBSourceBuilder
            .streamAll(SOURCE_NAME, () -> MongoClients.create(connectionString))
            .destroyFn(MongoClient::close)
            .searchFn(client -> {
                List<Bson> aggregates = new ArrayList<>();
                aggregates.add(Aggregates.match(new Document("fullDocument.val", new Document("$gt", 10))
                        .append("operationType", "insert")));

                aggregates.add(Aggregates.project(new Document("fullDocument.val", 1).append("_id", 1)));
                return client.watch(aggregates);
            })
            .mapFn(ChangeStreamDocument::getFullDocument)
            .startAtOperationTimeFn(client -> new BsonTimestamp(value))
            .build();

    Pipeline p = Pipeline.create();
    p.readFrom(source)
     .withNativeTimestamps(0)
     .writeTo(Sinks.list(list));

    Job job = jet.newJob(p);

    MongoCollection<Document> col1 = collection("db1", "col1");
    MongoCollection<Document> col2 = collection("db1", "col2");
    MongoCollection<Document> col3 = collection("db2", "col3");

    col1.insertOne(new Document("val", 1));
    col1.insertOne(new Document("val", 11).append("foo", "bar"));
    col2.insertOne(new Document("val", 2));
    col2.insertOne(new Document("val", 12).append("foo", "bar"));
    col3.insertOne(new Document("val", 3));
    col3.insertOne(new Document("val", 13).append("foo", "bar"));

    assertTrueEventually(() -> {
        assertEquals(3, list.size());
        list.forEach(document -> assertNull(document.get("foo")));

        assertEquals(11, list.get(0).get("val"));
        assertEquals(12, list.get(1).get("val"));
        assertEquals(13, list.get(2).get("val"));
    });

    col1.insertOne(new Document("val", 4));
    col1.insertOne(new Document("val", 14).append("foo", "bar"));
    col2.insertOne(new Document("val", 5));
    col2.insertOne(new Document("val", 15).append("foo", "bar"));
    col2.insertOne(new Document("val", 6));
    col2.insertOne(new Document("val", 16).append("foo", "bar"));

    assertTrueEventually(() -> {
        assertEquals(6, list.size());
        list.forEach(document -> assertNull(document.get("foo")));

        assertEquals(14, list.get(3).get("val"));
        assertEquals(15, list.get(4).get("val"));
        assertEquals(16, list.get(5).get("val"));
    });

    job.cancel();

}
 
源代码12 项目: quarkus   文件: ReactiveMongoClientImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline, ChangeStreamOptions options) {
    ChangeStreamPublisher<Document> publisher = apply(options, client.watch(pipeline));
    return Wrappers.toMulti(publisher);
}
 
源代码13 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(ClientSession clientSession, List<? extends Bson> pipeline,
        Class<D> clazz) {
    return Wrappers.toMulti(collection.watch(clientSession, pipeline, clazz));
}
 
源代码14 项目: syncer   文件: MongoV4MasterConnector.java
static Object getId(ChangeStreamDocument<Document> d) {
  BsonDocument documentKey = d.getDocumentKey();
  BsonValue o = documentKey.get(ID);
  return MongoTypeUtil.convertBson(o);
}
 
源代码15 项目: syncer   文件: MongoV4MasterConnector.java
private Map getFullDocument(ChangeStreamDocument<Document> d) {
  return (Map) MongoTypeUtil.convertBsonTypes(d.getFullDocument());
}
 
源代码16 项目: quarkus   文件: ReactiveMongoClientImpl.java
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(List<? extends Bson> pipeline, Class<T> clazz) {
    return Wrappers.toMulti(client.watch(pipeline, clazz));
}
 
源代码17 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(ClientSession clientSession, List<? extends Bson> pipeline,
        Class<D> clazz, ChangeStreamOptions options) {
    return Wrappers.toMulti(apply(options, collection.watch(clientSession, pipeline, clazz)));
}
 
源代码18 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(ClientSession clientSession, ChangeStreamOptions options) {
    return Wrappers.toMulti(apply(options, collection.watch(clientSession)));
}
 
源代码19 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline, ChangeStreamOptions options) {
    return Wrappers.toMulti(apply(options, collection.watch(pipeline)));
}
 
源代码20 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public <D> Multi<ChangeStreamDocument<D>> watch(Class<D> clazz, ChangeStreamOptions options) {
    return Wrappers.toMulti(apply(options, collection.watch(clazz)));
}
 
源代码21 项目: quarkus   文件: ReactiveMongoClientImpl.java
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(List<? extends Bson> pipeline, Class<T> clazz,
        ChangeStreamOptions options) {
    ChangeStreamPublisher<T> publisher = apply(options, client.watch(pipeline, clazz));
    return Wrappers.toMulti(publisher);
}
 
源代码22 项目: quarkus   文件: ReactiveMongoClientImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(ClientSession clientSession, List<? extends Bson> pipeline,
        ChangeStreamOptions options) {
    ChangeStreamPublisher<Document> publisher = apply(options, client.watch(clientSession, pipeline));
    return Wrappers.toMulti(publisher);
}
 
源代码23 项目: quarkus   文件: ReactiveMongoClientImpl.java
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(ClientSession clientSession, Class<T> clazz) {
    return Wrappers.toMulti(client.watch(clientSession, clazz));
}
 
源代码24 项目: quarkus   文件: ReactiveMongoClientImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(ClientSession clientSession, ChangeStreamOptions options) {
    ChangeStreamPublisher<Document> publisher = apply(options, client.watch(clientSession));
    return Wrappers.toMulti(publisher);
}
 
源代码25 项目: quarkus   文件: ReactiveMongoDatabaseImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch() {
    return Wrappers.toMulti(database.watch());
}
 
源代码26 项目: quarkus   文件: ReactiveMongoDatabaseImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(ChangeStreamOptions options) {
    return null;
}
 
源代码27 项目: quarkus   文件: ReactiveMongoDatabaseImpl.java
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(Class<T> clazz) {
    return Wrappers.toMulti(database.watch(clazz));
}
 
源代码28 项目: quarkus   文件: ReactiveMongoDatabaseImpl.java
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(Class<T> clazz, ChangeStreamOptions options) {
    return null;
}
 
源代码29 项目: quarkus   文件: ReactiveMongoDatabaseImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline) {
    return Wrappers.toMulti(database.watch(pipeline));
}
 
源代码30 项目: quarkus   文件: ReactiveMongoDatabaseImpl.java
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline, ChangeStreamOptions options) {
    return null;
}
 
 类所在包
 类方法
 同包方法