类com.mongodb.client.model.BulkWriteOptions源码实例Demo

下面列出了怎么用com.mongodb.client.model.BulkWriteOptions的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: quarkus   文件: CollectionManagementTest.java
@Test
void bulkWriteWithOptions() {
    ReactiveMongoDatabase database = client.getDatabase(DATABASE);
    ReactiveMongoCollection<Document> collection = database.getCollection("test");

    BulkWriteResult result = collection.bulkWrite(Arrays.asList(
            new InsertOneModel<>(new Document("_id", 4)),
            new InsertOneModel<>(new Document("_id", 5)),
            new InsertOneModel<>(new Document("_id", 6)),
            new UpdateOneModel<>(new Document("_id", 1),
                    new Document("$set", new Document("x", 2))),
            new DeleteOneModel<>(new Document("_id", 2)),
            new ReplaceOneModel<>(new Document("_id", 3),
                    new Document("_id", 3).append("x", 4))),
            new BulkWriteOptions().ordered(true)).await().indefinitely();

    assertThat(result.getDeletedCount()).isEqualTo(0);
    assertThat(result.getInsertedCount()).isEqualTo(3);

}
 
源代码2 项目: ditto   文件: MongoOpsUtil.java
private static Source<Optional<Throwable>, NotUsed> doDeleteByFilter(final MongoCollection<Document> collection,
        final Bson filter) {
    // https://stackoverflow.com/a/33164008
    // claims unordered bulk ops halve MongoDB load
    final List<WriteModel<Document>> writeModel =
            Collections.singletonList(new DeleteManyModel<>(filter));
    final BulkWriteOptions options = new BulkWriteOptions().ordered(false);
    return Source.fromPublisher(collection.bulkWrite(writeModel, options))
            .map(result -> {
                if (LOGGER.isDebugEnabled()) {
                    // in contrast to Bson, BsonDocument has meaningful toString()
                    final BsonDocument filterBsonDoc = BsonUtil.toBsonDocument(filter);
                    LOGGER.debug("Deleted <{}> documents from collection <{}>. Filter was <{}>.",
                            result.getDeletedCount(), collection.getNamespace(), filterBsonDoc);
                }
                return Optional.<Throwable>empty();
            })
            .recoverWithRetries(RETRY_ATTEMPTS, new PFBuilder<Throwable, Source<Optional<Throwable>, NotUsed>>()
                    .matchAny(throwable -> Source.single(Optional.of(throwable)))
                    .build());
}
 
源代码3 项目: ditto   文件: MongoSearchUpdaterFlow.java
private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(
        final List<AbstractWriteModel> abstractWriteModels) {
    final List<WriteModel<Document>> writeModels = abstractWriteModels.stream()
            .map(AbstractWriteModel::toMongo)
            .collect(Collectors.toList());
    return Source.fromPublisher(collection.bulkWrite(writeModels, new BulkWriteOptions().ordered(false)))
            .map(bulkWriteResult -> WriteResultAndErrors.success(abstractWriteModels, bulkWriteResult))
            .recoverWithRetries(1, new PFBuilder<Throwable, Source<WriteResultAndErrors, NotUsed>>()
                    .match(MongoBulkWriteException.class, bulkWriteException ->
                            Source.single(WriteResultAndErrors.failure(abstractWriteModels, bulkWriteException))
                    )
                    .matchAny(error ->
                            Source.single(WriteResultAndErrors.unexpectedError(abstractWriteModels, error))
                    )
                    .build()
            );
}
 
源代码4 项目: core-ng-project   文件: MongoCollectionImpl.java
@Override
public long bulkDelete(List<?> ids) {
    var watch = new StopWatch();
    int size = ids.size();
    int deletedRows = 0;
    try {
        List<DeleteOneModel<T>> models = new ArrayList<>(size);
        for (Object id : ids) {
            models.add(new DeleteOneModel<>(Filters.eq("_id", id)));
        }
        BulkWriteResult result = collection().bulkWrite(models, new BulkWriteOptions().ordered(false));
        deletedRows = result.getDeletedCount();
        return deletedRows;
    } finally {
        long elapsed = watch.elapsed();
        ActionLogContext.track("mongo", elapsed, 0, deletedRows);
        logger.debug("bulkDelete, collection={}, ids={}, size={}, deletedRows={}, elapsed={}", collectionName, ids, size, deletedRows, elapsed);
        checkSlowOperation(elapsed);
    }
}
 
源代码5 项目: df_data_service   文件: MongoAdminClient.java
public MongoAdminClient importJsonFile(String fileNamePath) {
    int count = 0;
    int batch = 100;

    List<InsertOneModel<Document>> docs = new ArrayList<>();

    try (BufferedReader br = new BufferedReader(new FileReader(fileNamePath))) {
        String line;
        while ((line = br.readLine()) != null) {
            docs.add(new InsertOneModel<>(Document.parse(line)));
            count++;
            if (count == batch) {
                this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
                docs.clear();
                count = 0;
            }
        }
    } catch (IOException fnfe) {
        fnfe.printStackTrace();
    }

    if (count > 0) {
        collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
    }

    return this;
}
 
源代码6 项目: df_data_service   文件: MongoAdminClient.java
public MongoAdminClient importJsonInputStream(InputStream fileInputStream) {
    int count = 0;
    int batch = 100;

    List<InsertOneModel<Document>> docs = new ArrayList<>();

    try (BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream))) {
        String line;
        while ((line = br.readLine()) != null) {
            docs.add(new InsertOneModel<>(Document.parse(line)));
            count++;
            if (count == batch) {
                this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
                docs.clear();
                count = 0;
            }
        }
    } catch (IOException fnfe) {
        fnfe.printStackTrace();
    }

    if (count > 0) {
        collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
    }

    return this;
}
 
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
                                            final BulkWriteOptions options) {
    return RxObservables.create(Observables.observe(new Block<SingleResultCallback<BulkWriteResult>>() {
        @Override
        public void apply(final SingleResultCallback<BulkWriteResult> callback) {
            wrapped.bulkWrite(requests, options, callback);
        }
    }), observableAdapter);
}
 
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
                                            final BulkWriteOptions options) {
    return new ObservableToPublisher<BulkWriteResult>(com.mongodb.async.client.Observables.observe(
            new Block<com.mongodb.async.SingleResultCallback<BulkWriteResult>>() {
                @Override
                public void apply(final com.mongodb.async.SingleResultCallback<BulkWriteResult> callback) {
                    wrapped.bulkWrite(requests, options, callback);
                }
            }));
}
 
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
                                            final List<? extends WriteModel<? extends TDocument>> requests,
                                            final BulkWriteOptions options) {
    return new ObservableToPublisher<BulkWriteResult>(com.mongodb.async.client.Observables.observe(
            new Block<com.mongodb.async.SingleResultCallback<BulkWriteResult>>() {
                @Override
                public void apply(final com.mongodb.async.SingleResultCallback<BulkWriteResult> callback) {
                    wrapped.bulkWrite(clientSession.getWrapped(), requests, options, callback);
                }
            }));
}
 
源代码10 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public Uni<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends T>> requests,
        BulkWriteOptions options) {
    return Wrappers.toUni(collection.bulkWrite(requests, options));
}
 
源代码11 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
@Override
public Uni<BulkWriteResult> bulkWrite(ClientSession clientSession,
        List<? extends WriteModel<? extends T>> requests, BulkWriteOptions options) {
    return Wrappers.toUni(collection.bulkWrite(clientSession, requests, options));
}
 
源代码12 项目: ditto   文件: MongoSearchUpdaterFlowTest.java
@SuppressWarnings("unchecked")
private void testStreamRestart(final Supplier<Throwable> errorSupplier) throws Exception {

    new TestKit(actorSystem) {{

        // GIVEN: The persistence fails with an error on every write

        final MongoDatabase db = Mockito.mock(MongoDatabase.class);
        final MongoCollection<Document> collection = Mockito.mock(MongoCollection.class);
        final Publisher<BulkWriteResult> publisher = s -> s.onError(errorSupplier.get());
        Mockito.when(db.getCollection(Mockito.any())).thenReturn(collection);
        Mockito.when(collection.bulkWrite(Mockito.any(), Mockito.any(BulkWriteOptions.class)))
                .thenReturn(publisher);

        // GIVEN: MongoSearchUpdaterFlow is wrapped inside a RestartSink

        final MongoSearchUpdaterFlow flow = MongoSearchUpdaterFlow.of(db);

        final Sink<Source<AbstractWriteModel, NotUsed>, ?> sink =
                flow.start(1, 1, Duration.ZERO).to(Sink.ignore());

        final Sink<Source<AbstractWriteModel, NotUsed>, ?> restartSink =
                RestartSink.withBackoff(Duration.ZERO, Duration.ZERO, 1.0, () -> sink);

        // WHEN: Many changes stream through MongoSearchUpdaterFlow

        final int numberOfChanges = 25;
        final CountDownLatch latch = new CountDownLatch(numberOfChanges);

        final AbstractWriteModel abstractWriteModel = Mockito.mock(AbstractWriteModel.class);
        final WriteModel<Document> mongoWriteModel = new DeleteOneModel<>(new Document());
        Mockito.when(abstractWriteModel.toMongo()).thenReturn(mongoWriteModel);
        Source.repeat(Source.single(abstractWriteModel))
                .take(numberOfChanges)
                .buffer(1, OverflowStrategy.backpressure())
                .map(source -> {
                    latch.countDown();
                    return source;
                })
                .runWith(restartSink, ActorMaterializer.create(actorSystem));

        // THEN: MongoSearchUpdaterFlow should keep restarting and keep consuming changes from the stream

        latch.await(5L, TimeUnit.SECONDS);
        assertThat(latch.getCount()).isZero();
    }};
}
 
源代码13 项目: mongo-java-driver-rx   文件: MongoCollectionImpl.java
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
    return bulkWrite(requests, new BulkWriteOptions());
}
 
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
    return bulkWrite(requests, new BulkWriteOptions());
}
 
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
                                            final List<? extends WriteModel<? extends TDocument>> requests) {
    return bulkWrite(clientSession, requests, new BulkWriteOptions());
}
 
源代码16 项目: quarkus   文件: ReactiveMongoCollection.java
/**
 * Executes a mix of inserts, updates, replaces, and deletes.
 *
 * @param requests the writes to execute
 * @param options the options to apply to the bulk write operation
 * @return a {@link Uni} receiving the {@link BulkWriteResult}
 */
Uni<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends T>> requests,
        BulkWriteOptions options);
 
源代码17 项目: quarkus   文件: ReactiveMongoCollection.java
/**
 * Executes a mix of inserts, updates, replaces, and deletes.
 *
 * @param clientSession the client session with which to associate this operation
 * @param requests the writes to execute
 * @param options the options to apply to the bulk write operation
 * @return a {@link Uni} receiving the {@link BulkWriteResult}
 */
Uni<BulkWriteResult> bulkWrite(ClientSession clientSession,
        List<? extends WriteModel<? extends T>> requests,
        BulkWriteOptions options);
 
源代码18 项目: mongo-java-driver-rx   文件: MongoCollection.java
/**
 * Executes a mix of inserts, updates, replaces, and deletes.
 *
 * @param requests the writes to execute
 * @param options  the options to apply to the bulk write operation
 * @return an Observable with a single element the BulkWriteResult
 */
Observable<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
 
/**
 * Executes a mix of inserts, updates, replaces, and deletes.
 *
 * @param requests the writes to execute
 * @param options  the options to apply to the bulk write operation
 * @return a publisher with a single element the BulkWriteResult
 */
Publisher<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
 
/**
 * Executes a mix of inserts, updates, replaces, and deletes.
 *
 * @param clientSession the client session with which to associate this operation
 * @param requests the writes to execute
 * @param options  the options to apply to the bulk write operation
 * @return a publisher with a single element the BulkWriteResult
 * @mongodb.server.release 3.6
 * @since 1.7
 */
Publisher<BulkWriteResult> bulkWrite(ClientSession clientSession, List<? extends WriteModel<? extends TDocument>> requests,
                                     BulkWriteOptions options);
 
 类所在包
 同包方法