下面列出了怎么用com.mongodb.reactivestreams.client.Success的API类实例代码及写法,或者点击链接到github查看源代码。
private void prepareItemsInDatabase(long elements) {
if (elements <= 0) {
return;
}
MongoCollection<News> collection = mongoClient().getDatabase("news")
.getCollection("news", News.class);
Flowable<Success> successFlowable = Flowable.fromPublisher(collection.drop())
.ignoreElements()
.andThen(Flowable.rangeLong(0L,
elements)
.map(l -> NewsHarness.generate())
.buffer(500,
TimeUnit.MILLISECONDS)
.flatMap(collection::insertMany));
if (elements == Long.MAX_VALUE || elements == Integer.MAX_VALUE) {
successFlowable.subscribe();
}
else {
successFlowable.blockingSubscribe();
}
}
private void prepareRandomData(long elements) {
if (elements <= 0) {
return;
}
Flowable<Success> successFlowable = Flowable.fromPublisher(collection.drop())
.ignoreElements()
.andThen(
Flowable
.rangeLong(0L, elements)
.map(l -> NewsHarness.generate())
.buffer(500, TimeUnit.MILLISECONDS)
.flatMap(collection::insertMany)
);
if (elements == Long.MAX_VALUE || elements == Integer.MAX_VALUE) {
successFlowable.subscribe();
} else {
successFlowable.blockingSubscribe();
}
}
/**
* Creates the capped collection {@code collectionName} using {@code clientWrapper} if it doesn't exists yet.
*
* @param database The database to use.
* @param collectionName The name of the capped collection that should be created.
* @param cappedCollectionSizeInBytes The size in bytes of the collection that should be created.
* @param materializer The actor materializer to pre-materialize the restart source.
* @return Returns the created or retrieved collection.
*/
private static Source<MongoCollection, NotUsed> createOrGetCappedCollection(
final MongoDatabase database,
final String collectionName,
final long cappedCollectionSizeInBytes,
final ActorMaterializer materializer) {
final Source<Success, NotUsed> createCollectionSource =
repeatableCreateCappedCollectionSource(database, collectionName, cappedCollectionSizeInBytes);
final Source<MongoCollection, NotUsed> infiniteCollectionSource =
createCollectionSource.map(success -> database.getCollection(collectionName))
.flatMapConcat(Source::repeat);
final Source<MongoCollection, NotUsed> restartSource =
RestartSource.withBackoff(BACKOFF_MIN, BACKOFF_MAX, 1.0, () -> infiniteCollectionSource);
// pre-materialize source with BroadcastHub so that a successfully obtained capped collection is reused
// until the stream fails, whereupon it gets recreated with backoff.
return restartSource.runWith(BroadcastHub.of(MongoCollection.class, 1), materializer);
}
private static Source<Success, NotUsed> repeatableCreateCappedCollectionSource(
final MongoDatabase database,
final String collectionName,
final long cappedCollectionSizeInBytes) {
final CreateCollectionOptions collectionOptions = new CreateCollectionOptions()
.capped(true)
.sizeInBytes(cappedCollectionSizeInBytes)
.maxDocuments(1);
return Source.lazily(
() -> Source.fromPublisher(database.createCollection(collectionName, collectionOptions)))
.mapMaterializedValue(whatever -> NotUsed.getInstance())
.withAttributes(Attributes.inputBuffer(1, 1))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Success, NotUsed>>()
.match(MongoCommandException.class,
MongoTimestampPersistence::isCollectionAlreadyExistsError,
error -> Source.single(Success.SUCCESS))
.build());
}
private Source<Success, NotUsed> createIndices(final String collectionName, final List<Index> indices) {
if (indices.isEmpty()) {
return Source.empty();
}
return Source.from(indices)
.flatMapConcat(index -> createIndex(collectionName, index));
}
private Source<Success, NotUsed> dropIndices(final String collectionName, final List<String> indices) {
if (indices.isEmpty()) {
return Source.empty();
}
return Source.from(indices)
.flatMapConcat(index -> dropIndex(collectionName, index));
}
@Override
public Publisher<Success> drop() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.drop(voidToSuccessCallback(callback));
}
}));
}
private static PartialFunction<Throwable, Source<Success, NotUsed>> buildDropIndexRecovery(
final String indexDescription) {
return new PFBuilder<Throwable, Source<Success, NotUsed>>()
.match(MongoCommandException.class, IndexOperations::isIndexNotFound, throwable -> {
LOGGER.debug("Index <{}> could not be dropped because it does not exist (anymore).",
indexDescription);
return Source.single(Success.SUCCESS);
})
.build();
}
GridFSUploadPublisher<ObjectId> withObjectId() {
final GridFSUploadPublisherImpl wrapped = this;
return new GridFSUploadPublisher<ObjectId>() {
@Override
public ObjectId getObjectId() {
return wrapped.getObjectId();
}
@Override
public BsonValue getId() {
return wrapped.getId();
}
@Override
public void subscribe(final Subscriber<? super ObjectId> objectIdSub) {
wrapped.subscribe(new Subscriber<Success>() {
@Override
public void onSubscribe(final Subscription s) {
objectIdSub.onSubscribe(s);
}
@Override
public void onNext(final Success success) {
objectIdSub.onNext(getObjectId());
}
@Override
public void onError(final Throwable t) {
objectIdSub.onError(t);
}
@Override
public void onComplete() {
objectIdSub.onComplete();
}
});
}
};
}
@Override
public Publisher<Success> delete(final BsonValue id) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.delete(id, voidToSuccessCallback(callback));
}
}));
}
@Test
public void testDeletions() throws Throwable {
//Start Example 55
Publisher<Success> insertManyPublisher = collection.insertMany(asList(
Document.parse("{ item: 'journal', qty: 25, size: { h: 14, w: 21, uom: 'cm' }, status: 'A' }"),
Document.parse("{ item: 'notebook', qty: 50, size: { h: 8.5, w: 11, uom: 'in' }, status: 'A' }"),
Document.parse("{ item: 'paper', qty: 100, size: { h: 8.5, w: 11, uom: 'in' }, status: 'D' }"),
Document.parse("{ item: 'planner', qty: 75, size: { h: 22.85, w: 30, uom: 'cm' }, status: 'D' }"),
Document.parse("{ item: 'postcard', qty: 45, size: { h: 10, w: 15.25, uom: 'cm' }, status: 'A' }")
));
// End Example 55
assertSuccess(insertManyPublisher);
assertSize(collection.find(), 5);
//Start Example 56
Publisher<DeleteResult> deleteManyPublisher = collection.deleteMany(new Document());
//End Example 56
assertSuccess(deleteManyPublisher);
assertSize(collection.find(), 0);
assertSuccess(insertManyPublisher);
//Start Example 57
deleteManyPublisher = collection.deleteMany(eq("status", "A"));
//End Example 57
assertSuccess(deleteManyPublisher);
assertSize(collection.find(), 2);
//Start Example 58
Publisher<DeleteResult> deleteOnePublisher = collection.deleteOne(eq("status", "D"));
//End Example 58
assertSuccess(deleteOnePublisher);
assertSize(collection.find(), 1);
}
@Override
public Publisher<Success> rename(final ObjectId id, final String newFilename) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.rename(id, newFilename, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> delete(final ClientSession clientSession, final BsonValue id) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.delete(clientSession.getWrapped(), id, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> delete(final ObjectId id) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.delete(id, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> rename(final ClientSession clientSession, final BsonValue id, final String newFilename) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.rename(clientSession.getWrapped(), id, newFilename, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> abort() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.abort(voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> insertOne(final TDocument document, final InsertOneOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.insertOne(document, options, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> insertMany(final List<? extends TDocument> documents, final InsertManyOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.insertMany(documents, options, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> insertMany(final ClientSession clientSession, final List<? extends TDocument> documents,
final InsertManyOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.insertMany(clientSession.getWrapped(), documents, options, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> drop() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.drop(voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> drop(final ClientSession clientSession) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.drop(clientSession.getWrapped(), voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> toCollection() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>(){
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.toCollection(voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> dropIndex(final String indexName, final DropIndexOptions dropIndexOptions) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.dropIndex(indexName, dropIndexOptions, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> dropIndex(final Bson keys, final DropIndexOptions dropIndexOptions) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.dropIndex(keys, dropIndexOptions, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> dropIndex(final ClientSession clientSession, final Bson keys, final DropIndexOptions dropIndexOptions) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.dropIndex(clientSession.getWrapped(), keys, dropIndexOptions, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> renameCollection(final MongoNamespace newCollectionNamespace, final RenameCollectionOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.renameCollection(newCollectionNamespace, options, voidToSuccessCallback(callback));
}
}));
}
/**
* Helper to trigger Boolean SingleResultCallbacks for Void operations
*
* @param callback the boolean single result callback.
* @return the results callback for an operation that returns null to signal success.
*/
public static com.mongodb.async.SingleResultCallback<Void> voidToSuccessCallback(
final com.mongodb.async.SingleResultCallback<Success> callback) {
return new com.mongodb.async.SingleResultCallback<Void>() {
@Override
public void onResult(final Void result, final Throwable t) {
callback.onResult(Success.SUCCESS, t);
}
};
}
@Override
public Publisher<Success> drop() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.drop(voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> close() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.close(voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> toCollection() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.toCollection(voidToSuccessCallback(callback));
}
}));
}