下面列出了怎么用com.mongodb.reactivestreams.client.ChangeStreamPublisher的API类实例代码及写法,或者点击链接到github查看源代码。
public <T> ChangeStreamPublisher<T> apply(ChangeStreamPublisher<T> stream) {
ChangeStreamPublisher<T> publisher = stream;
if (collation != null) {
publisher = publisher.collation(collation);
}
if (maxAwaitTime > 0) {
publisher = publisher.maxAwaitTime(maxAwaitTime, maxAwaitTimeUnit);
}
if (fullDocument != null) {
publisher = publisher.fullDocument(fullDocument);
}
if (resumeToken != null) {
publisher = publisher.resumeAfter(resumeToken);
}
if (startAtOperationTime != null) {
publisher = publisher.startAtOperationTime(startAtOperationTime);
}
return publisher;
}
@BeforeMethod
public void setUp() {
map = TestHelper.createMap(true);
mockSourceContext = mock(SourceContext.class);
mockMongoClient = mock(MongoClient.class);
mockMongoDb = mock(MongoDatabase.class);
mockMongoColl = mock(MongoCollection.class);
mockPublisher = mock(ChangeStreamPublisher.class);
source = new MongoSource(() -> mockMongoClient);
when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
when(mockMongoColl.watch()).thenReturn(mockPublisher);
when(mockPublisher.batchSize(anyInt())).thenReturn(mockPublisher);
when(mockPublisher.fullDocument(any())).thenReturn(mockPublisher);
doAnswer((invocation) -> {
subscriber = invocation.getArgument(0, Subscriber.class);
return null;
}).when(mockPublisher).subscribe(any());
}
private <D> ChangeStreamPublisher<D> apply(ChangeStreamOptions options, ChangeStreamPublisher<D> watch) {
if (options == null) {
return watch;
}
return options.apply(watch);
}
private <T> ChangeStreamPublisher<T> apply(ChangeStreamOptions options, ChangeStreamPublisher<T> publisher) {
if (options == null) {
return publisher;
}
return options.apply(publisher);
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(ChangeStreamOptions options) {
ChangeStreamPublisher<Document> publisher = apply(options, client.watch());
return Wrappers.toMulti(publisher);
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(Class<T> clazz, ChangeStreamOptions options) {
ChangeStreamPublisher<T> publisher = apply(options, client.watch(clazz));
return Wrappers.toMulti(publisher);
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(List<? extends Bson> pipeline, ChangeStreamOptions options) {
ChangeStreamPublisher<Document> publisher = apply(options, client.watch(pipeline));
return Wrappers.toMulti(publisher);
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(List<? extends Bson> pipeline, Class<T> clazz,
ChangeStreamOptions options) {
ChangeStreamPublisher<T> publisher = apply(options, client.watch(pipeline, clazz));
return Wrappers.toMulti(publisher);
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(ClientSession clientSession, ChangeStreamOptions options) {
ChangeStreamPublisher<Document> publisher = apply(options, client.watch(clientSession));
return Wrappers.toMulti(publisher);
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(ClientSession clientSession, Class<T> clazz,
ChangeStreamOptions options) {
ChangeStreamPublisher<T> publisher = apply(options, client.watch(clientSession, clazz));
return Wrappers.toMulti(publisher);
}
@Override
public Multi<ChangeStreamDocument<Document>> watch(ClientSession clientSession, List<? extends Bson> pipeline,
ChangeStreamOptions options) {
ChangeStreamPublisher<Document> publisher = apply(options, client.watch(clientSession, pipeline));
return Wrappers.toMulti(publisher);
}
@Override
public <T> Multi<ChangeStreamDocument<T>> watch(ClientSession clientSession, List<? extends Bson> pipeline,
Class<T> clazz, ChangeStreamOptions options) {
ChangeStreamPublisher<T> publisher = apply(options, client.watch(clientSession, pipeline, clazz));
return Wrappers.toMulti(publisher);
}
@Override
public ChangeStreamPublisher<Document> watch() {
return mongoClient.watch();
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final Class<TResult> tResultClass) {
return mongoClient.watch(tResultClass);
}
@Override
public ChangeStreamPublisher<Document> watch(final List<? extends Bson> pipeline) {
return mongoClient.watch(pipeline);
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final List<? extends Bson> pipeline,
final Class<TResult> tResultClass) {
return mongoClient.watch(pipeline, tResultClass);
}
@Override
public ChangeStreamPublisher<Document> watch(final ClientSession clientSession) {
return mongoClient.watch(clientSession);
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final ClientSession clientSession,
final Class<TResult> tResultClass) {
return mongoClient.watch(clientSession, tResultClass);
}
@Override
public ChangeStreamPublisher<Document> watch(final ClientSession clientSession,
final List<? extends Bson> pipeline) {
return mongoClient.watch(clientSession, pipeline);
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final ClientSession clientSession,
final List<? extends Bson> pipeline, final Class<TResult> tResultClass) {
return mongoClient.watch(clientSession, pipeline, tResultClass);
}
@Override
public ChangeStreamPublisher<Document> watch() {
return watch(Document.class);
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final Class<TResult> resultClass) {
return watch(Collections.<Bson>emptyList(), resultClass);
}
@Override
public ChangeStreamPublisher<Document> watch(final List<? extends Bson> pipeline) {
return watch(pipeline, Document.class);
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final List<? extends Bson> pipeline, final Class<TResult> resultClass) {
return new ChangeStreamPublisherImpl<TResult>(wrapped.watch(pipeline, resultClass));
}
@Override
public ChangeStreamPublisher<Document> watch(final ClientSession clientSession) {
return watch(clientSession, Document.class);
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final ClientSession clientSession, final Class<TResult> resultClass) {
return watch(clientSession, Collections.<Bson>emptyList(), resultClass);
}
@Override
public ChangeStreamPublisher<Document> watch(final ClientSession clientSession, final List<? extends Bson> pipeline) {
return watch(clientSession, pipeline, Document.class);
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final ClientSession clientSession, final List<? extends Bson> pipeline,
final Class<TResult> resultClass) {
return new ChangeStreamPublisherImpl<TResult>(wrapped.watch(clientSession.getWrapped(), pipeline, resultClass));
}
@Override
public ChangeStreamPublisher<Document> watch() {
return watch(Collections.<Bson>emptyList());
}
@Override
public <TResult> ChangeStreamPublisher<TResult> watch(final Class<TResult> resultClass) {
return watch(Collections.<Bson>emptyList(), resultClass);
}