类com.mongodb.client.ChangeStreamIterable源码实例Demo

下面列出了怎么用com.mongodb.client.ChangeStreamIterable的API类实例代码及写法,或者点击链接到github查看源代码。

private static <T, U> SupplierEx<StreamContext<T, U>> contextFn(
        SupplierEx<? extends MongoClient> connectionSupplier,
        FunctionEx<? super MongoClient, ? extends MongoDatabase> databaseFn,
        FunctionEx<? super MongoDatabase, ? extends MongoCollection<? extends T>> collectionFn,
        ConsumerEx<? super MongoClient> destroyFn,
        FunctionEx<? super MongoCollection<? extends T>, ? extends ChangeStreamIterable<? extends T>> searchFn,
        FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
        FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn

) {
    return () -> {
        MongoClient client = connectionSupplier.get();
        MongoDatabase database = databaseFn.apply(client);
        MongoCollection<? extends T> collection = collectionFn.apply(database);
        ChangeStreamIterable<? extends T> changeStreamIterable = searchFn.apply(collection);
        return new StreamContext<>(client, changeStreamIterable, mapFn, destroyFn, startAtOperationTimeFn);
    };
}
 
private static <T, U> SupplierEx<StreamContext<T, U>> contextFn(
        SupplierEx<? extends MongoClient> connectionSupplier,
        FunctionEx<? super MongoClient, ? extends MongoDatabase> databaseFn,
        ConsumerEx<? super MongoClient> destroyFn,
        FunctionEx<? super MongoDatabase, ? extends ChangeStreamIterable<? extends T>> searchFn,
        FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
        FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
    return () -> {
        MongoClient client = connectionSupplier.get();
        MongoDatabase database = databaseFn.apply(client);
        ChangeStreamIterable<? extends T> changeStreamIterable = searchFn.apply(database);
        return new StreamContext<>(client, changeStreamIterable, mapFn, destroyFn, startAtOperationTimeFn);
    };
}
 
private static <T, U> SupplierEx<StreamContext<T, U>> contextFn(
        SupplierEx<? extends MongoClient> connectionSupplier,
        ConsumerEx<? super MongoClient> destroyFn,
        FunctionEx<? super MongoClient, ? extends ChangeStreamIterable<? extends T>> searchFn,
        FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
        FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
    return () -> {
        MongoClient client = connectionSupplier.get();
        ChangeStreamIterable<? extends T> changeStreamIterable = searchFn.apply(client);
        return new StreamContext<>(client, changeStreamIterable, mapFn, destroyFn, startAtOperationTimeFn);
    };
}
 
StreamContext(
        MongoClient client,
        ChangeStreamIterable<? extends T> changeStreamIterable,
        FunctionEx<? super ChangeStreamDocument<? extends T>, U> mapFn,
        ConsumerEx<? super MongoClient> destroyFn,
        FunctionEx<? super MongoClient, ? extends BsonTimestamp> startAtOperationTimeFn
) {
    this.client = client;
    this.changeStreamIterable = changeStreamIterable;
    this.mapFn = mapFn;
    this.destroyFn = destroyFn;

    this.timestamp = startAtOperationTimeFn == null ? null : startAtOperationTimeFn.apply(client);
}
 
源代码5 项目: hazelcast-jet-contrib   文件: MongoDBSourceTest.java
private StreamSource<? extends Document> streamSource(
        Document filter,
        Document projection,
        int connectionTimeoutSeconds
) {
    String connectionString = mongoContainer.connectionString();
    long value = startAtOperationTime.getValue();
    return MongoDBSourceBuilder
            .stream(SOURCE_NAME, () -> mongoClient(connectionString, connectionTimeoutSeconds))
            .databaseFn(client -> client.getDatabase(DB_NAME))
            .collectionFn(db -> db.getCollection(COL_NAME))
            .destroyFn(MongoClient::close)
            .searchFn(col -> {
                List<Bson> aggregates = new ArrayList<>();
                if (filter != null) {
                    aggregates.add(Aggregates.match(filter));
                }
                if (projection != null) {
                    aggregates.add(Aggregates.project(projection));
                }
                ChangeStreamIterable<? extends Document> watch;
                if (aggregates.isEmpty()) {
                    watch = col.watch();
                } else {
                    watch = col.watch(aggregates);
                }
                return watch;
            })
            .mapFn(ChangeStreamDocument::getFullDocument)
            .startAtOperationTimeFn(client -> new BsonTimestamp(value))
            .build();
}
 
源代码6 项目: mongo-kafka   文件: MongoSourceTask.java
private MongoCursor<BsonDocument> tryCreateCursor(
    final MongoSourceConfig sourceConfig,
    final MongoClient mongoClient,
    final BsonDocument resumeToken) {
  try {
    ChangeStreamIterable<Document> changeStreamIterable =
        getChangeStreamIterable(sourceConfig, mongoClient);
    if (resumeToken != null && supportsStartAfter) {
      LOGGER.info("Resuming the change stream after the previous offset: {}", resumeToken);
      changeStreamIterable.startAfter(resumeToken);
    } else if (resumeToken != null && !invalidatedCursor) {
      LOGGER.info("Resuming the change stream after the previous offset using resumeAfter.");
      changeStreamIterable.resumeAfter(resumeToken);
    } else {
      LOGGER.info("New change stream cursor created without offset.");
    }
    return changeStreamIterable.withDocumentClass(BsonDocument.class).iterator();
  } catch (MongoCommandException e) {
    if (resumeToken != null) {
      if (e.getErrorCode() == 260) {
        invalidatedCursor = true;
        return tryCreateCursor(sourceConfig, mongoClient, null);
      } else if ((e.getErrorCode() == 9 || e.getErrorCode() == 40415)
          && e.getErrorMessage().contains("startAfter")) {
        supportsStartAfter = false;
        return tryCreateCursor(sourceConfig, mongoClient, resumeToken);
      }
    }
    LOGGER.info("Failed to resume change stream: {} {}", e.getErrorMessage(), e.getErrorCode());
    return null;
  }
}
 
源代码7 项目: mongo-kafka   文件: MongoSourceTask.java
private ChangeStreamIterable<Document> getChangeStreamIterable(
    final MongoSourceConfig sourceConfig, final MongoClient mongoClient) {
  String database = sourceConfig.getString(DATABASE_CONFIG);
  String collection = sourceConfig.getString(COLLECTION_CONFIG);

  Optional<List<Document>> pipeline = sourceConfig.getPipeline();
  ChangeStreamIterable<Document> changeStream;
  if (database.isEmpty()) {
    LOGGER.info("Watching all changes on the cluster");
    changeStream = pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
  } else if (collection.isEmpty()) {
    LOGGER.info("Watching for database changes on '{}'", database);
    MongoDatabase db = mongoClient.getDatabase(database);
    changeStream = pipeline.map(db::watch).orElse(db.watch());
  } else {
    LOGGER.info("Watching for collection changes on '{}.{}'", database, collection);
    MongoCollection<Document> coll = mongoClient.getDatabase(database).getCollection(collection);
    changeStream = pipeline.map(coll::watch).orElse(coll.watch());
  }

  int batchSize = sourceConfig.getInt(BATCH_SIZE_CONFIG);
  if (batchSize > 0) {
    changeStream.batchSize(batchSize);
  }
  sourceConfig.getFullDocument().ifPresent(changeStream::fullDocument);
  sourceConfig.getCollation().ifPresent(changeStream::collation);
  return changeStream;
}
 
源代码8 项目: hazelcast-jet-contrib   文件: MongoDBSources.java
/**
 * Returns a MongoDB stream source which watches the changes on the
 * collection. The source applies the given {@code filter} and {@code
 * projection} on the change stream documents.
 * <p>
 * Change stream is available for replica sets and sharded clusters that
 * use WiredTiger storage engine and replica set protocol version 1 (pv1).
 * Change streams can also be used on deployments which employ MongoDB's
 * encryption-at-rest feature. You cannot watch on system collections and
 * collections in admin, local and config databases.
 * <p>
 * See {@link MongoDBSourceBuilder} for creating custom MongoDB sources.
 * <p>
 * Here's an example which streams inserts on a collection having the
 * field {@code age} with a value greater than {@code 10} and applies a
 * projection so that only the {@code age} field is returned in the
 * emitted document.
 *
 * <pre>{@code
 * StreamSource<? extends Document> streamSource =
 *         MongoDBSources.stream(
 *                 "stream-source",
 *                 "mongodb://127.0.0.1:27017",
 *                 "myDatabase",
 *                 "myCollection",
 *                 new Document("fullDocument.age", new Document("$gt", 10))
 *                         .append("operationType", "insert"),
 *                 new Document("fullDocument.age", 1)
 *         );
 *
 * Pipeline p = Pipeline.create();
 * StreamSourceStage<? extends Document> srcStage = p.readFrom(streamSource);
 * }</pre>
 *
 * @param name             a descriptive name for the source (diagnostic purposes)
 * @param connectionString a connection string URI to MongoDB for example:
 *                         {@code mongodb://127.0.0.1:27017}
 * @param database         the name of the database
 * @param collection       the name of the collection
 * @param filter           filter object as a {@link Document}
 * @param projection       projection object as a {@link Document}
 */
@Nonnull
public static StreamSource<? extends Document> stream(
        @Nonnull String name,
        @Nonnull String connectionString,
        @Nonnull String database,
        @Nonnull String collection,
        @Nullable Document filter,
        @Nullable Document projection
) {
    return MongoDBSourceBuilder
            .stream(name, () -> MongoClients.create(connectionString))
            .databaseFn(client -> client.getDatabase(database))
            .collectionFn(db -> db.getCollection(collection))
            .destroyFn(MongoClient::close)
            .searchFn(
                    col -> {
                        List<Bson> aggregates = new ArrayList<>();
                        if (filter != null) {
                            aggregates.add(Aggregates.match(filter));
                        }
                        if (projection != null) {
                            aggregates.add(Aggregates.project(projection));
                        }
                        ChangeStreamIterable<? extends Document> watch;
                        if (aggregates.isEmpty()) {
                            watch = col.watch();
                        } else {
                            watch = col.watch(aggregates);
                        }
                        return watch;
                    }
            )
            .mapFn(ChangeStreamDocument::getFullDocument)
            .build();
}
 
 类所在包
 类方法
 同包方法