com.mongodb.client.model.BulkWriteOptions#akka.stream.OverflowStrategy源码实例Demo

下面列出了com.mongodb.client.model.BulkWriteOptions#akka.stream.OverflowStrategy 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ditto   文件: ResumeSource.java
/**
 * Create a flow that delays its elements with exponential backoff and reset after no element passes through the
 * stream for a fixed period of time. Stream elements represent failures and are logged as errors.
 * <ul>
 * <li>Emits when: upstream emits.</li>
 * <li>Completes when: upstream completes.</li>
 * <li>Cancels when: never - this is a part of a feedback loop.</li>
 * <li>Fails when: {@code maxRestarts} elements pass through the stream.</li>
 * </ul>
 *
 * @param minBackoff Minimum backoff duration.
 * @param maxBackoff Maximum backoff duration.
 * @param maxRestarts Maximum number of tolerated failures. Tolerate no failure if 0. Tolerate arbitrarily many
 * failures if negative.
 * @param recovery The period after which backoff is reset to {@code minBackoff} if no failure occurred.
 * @param <E> Type of stream elements.
 * @return A delayed flow.
 */
private static <E> Flow<E, E, NotUsed> backoff(final Duration minBackoff, final Duration maxBackoff,
        final int maxRestarts, final Duration recovery) {
    final Flow<E, E, NotUsed> neverCancelFlowWithErrorLogging = Flow.<E>create()
            .log("resume-source-errors-flow")
            .withAttributes(logLevels(logLevelInfo(), logLevelDebug(), logLevelInfo()))
            .via(new NeverCancelFlow<>());

    final Flow<E, E, NotUsed> upstream = maxRestarts < 0
            ? neverCancelFlowWithErrorLogging
            : maxRestarts == 0
            ? neverCancelFlowWithErrorLogging.flatMapConcat(ResumeSource::failWithLimitsReached)
            : neverCancelFlowWithErrorLogging.limit(maxRestarts);

    return upstream.statefulMapConcat(() -> new StatefulBackoffFunction<>(minBackoff, maxBackoff, recovery))
            .flatMapConcat(pair ->
                    Source.single(pair.first())
                            .delay(pair.second(), OverflowStrategy.backpressure())
            );
}
 
源代码2 项目: ditto   文件: ResumeSource.java
/**
 * Create a flow from seeds to either elements or failure with reason and the previous N elements that passed
 * through the stream before the failure. For maximum information, even elements emitted 2 or more failures ago
 * are kept in the look-behind queue.
 * <ul>
 * <li>Emits when: stream created by {@code resume} emits or fails.</li>
 * <li>Completes when: a stream created by {@code resume} completes.</li>
 * <li>Cancels when: downstream cancels.</li>
 * <li>Fails when: never.</li>
 * </ul>
 *
 * @param resume Creator of a stream of elements from a resumption seed.
 * @param lookBehind How many elements to keep in memory to create the next seed on failure.
 * @param <S> Type of seeds.
 * @param <E> Type of elements.
 * @return A never-failing flow.
 */
private static <S, E> Flow<S, Either<FailureWithLookBehind<E>, E>, NotUsed> resumeWithFailuresAppended(
        final Function<S, Source<E, ?>> resume, final int lookBehind,
        final Function<Throwable, Optional<Throwable>> resumeOrMapError) {

    return Flow.<S>create()
            .flatMapConcat(seed -> resume.apply(seed)
                    .<Envelope<E>>map(Element::new)
                    .concat(Source.single(new EndOfStream<>()))
                    .recoverWithRetries(1,
                            new PFBuilder<Throwable, Graph<SourceShape<Envelope<E>>, NotUsed>>()
                                    .matchAny(error -> resumeOrMapError.apply(error)
                                            .<Source<Envelope<E>, NotUsed>>map(Source::failed)
                                            .orElseGet(() -> Source.single(new Error<>(error)))
                                    )
                                    .build())
            )
            .via(new EndStreamOnEOS<>())
            .buffer(1, OverflowStrategy.backpressure())
            .statefulMapConcat(() -> new StatefulLookBehindFunction<>(lookBehind));
}
 
源代码3 项目: ditto   文件: AbstractGraphActor.java
private SourceQueueWithComplete<T> getSourceQueue(final ActorMaterializer materializer) {
    // Log stream completion and failure at level ERROR because the stream is supposed to survive forever.
    final Attributes streamLogLevels =
            Attributes.logLevels(Attributes.logLevelDebug(), Attributes.logLevelError(),
                    Attributes.logLevelError());

    return Source.<T>queue(getBufferSize(), OverflowStrategy.dropNew())
            .map(this::incrementDequeueCounter)
            .log("graph-actor-stream-1-dequeued", logger)
            .withAttributes(streamLogLevels)
            .via(Flow.fromFunction(this::beforeProcessMessage))
            .log("graph-actor-stream-2-preprocessed", logger)
            .withAttributes(streamLogLevels)
            .via(processMessageFlow())
            .log("graph-actor-stream-3-processed", logger)
            .withAttributes(streamLogLevels)
            .to(processedMessageSink())
            .run(materializer);
}
 
源代码4 项目: ditto   文件: HttpPublisherActor.java
@SuppressWarnings("unused")
private HttpPublisherActor(final Connection connection, final HttpPushFactory factory) {
    super(connection);
    this.factory = factory;

    final ActorSystem system = getContext().getSystem();
    final ConnectionConfig connectionConfig =
            DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(system.settings().config()))
                    .getConnectionConfig();
    config = connectionConfig.getHttpPushConfig();

    materializer = ActorMaterializer.create(getContext());
    sourceQueue =
            Source.<Pair<HttpRequest, HttpPushContext>>queue(config.getMaxQueueSize(), OverflowStrategy.dropNew())
                    .viaMat(factory.createFlow(system, log), Keep.left())
                    .toMat(Sink.foreach(this::processResponse), Keep.left())
                    .run(materializer);
}
 
源代码5 项目: glowroot   文件: StreamController.java
public Result stream() {
    Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
            .mapMaterializedValue(sourceActor -> {
                sourceActor.tell(ByteString.fromString("kiki"), null);
                try {
                    MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                sourceActor.tell(ByteString.fromString("foo"), null);
                sourceActor.tell(ByteString.fromString("bar"), null);
                sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
                new CreateTraceEntry().traceEntryMarker();
                return null;
            });
    return ok().chunked(source);
}
 
源代码6 项目: reactive-streams-in-java   文件: Application.java
@Lazy(false)
@Bean
MessageConsumer createAkkaMessageConsumer(Flux<String> messageFlux, ActorSystem actorSystem) {
    MessageConsumer messageConsumer = new MessageConsumer("Akka");
    // using Akka Streams to consume messages:
    ActorMaterializer mat = ActorMaterializer.create(actorSystem);

    Source.fromPublisher(messageFlux)
            .buffer(100, OverflowStrategy.backpressure()) // 100 max buffer
            .to(Sink.foreach(msg -> messageConsumer.accept(msg)))
            .run(mat);

    return messageConsumer;
}
 
源代码7 项目: ditto   文件: PersistenceIdSource.java
/**
 * Create a stream of snapshot revisions of all known entities.
 * The stream fails if there is a failure requesting any stream or processing any stream element.
 *
 * @param config configuration of the persistence ID source.
 * @param pubSubMediator the pub-sub mediator.
 * @return source of entity IDs with revisions of their latest snapshots.
 */
public static Source<EntityIdWithRevision, NotUsed> create(final PersistenceIdsConfig config,
        final ActorRef pubSubMediator) {
    return Source.from(PERSISTENCE_STREAMING_ACTOR_PATHS)
            .buffer(1, OverflowStrategy.backpressure())
            .flatMapConcat(path -> buildResumeSource(config, pubSubMediator, path)
                    // recover to empty source to cleanup other resource types even on long-term failure
                    .recoverWithRetries(1, Throwable.class, Source::empty));
}
 
源代码8 项目: ditto   文件: DefaultHttpPushFactory.java
@Override
public <T> Flow<Pair<HttpRequest, T>, Pair<Try<HttpResponse>, T>, ?> createFlow(final ActorSystem system,
        final LoggingAdapter log) {

    final Http http = Http.get(system);
    final ConnectionPoolSettings poolSettings = getConnectionPoolSettings(system);
    final Flow<Pair<HttpRequest, T>, Pair<Try<HttpResponse>, T>, ?> flow;
    if (null != httpsConnectionContext) {
        final ConnectHttp connectHttpsWithCustomSSLContext =
                ConnectHttp.toHostHttps(baseUri).withCustomHttpsContext(httpsConnectionContext);
        // explicitly added <T> as in (some?) IntelliJ idea the line would show an error:
        flow = http.<T>cachedHostConnectionPoolHttps(connectHttpsWithCustomSSLContext, poolSettings, log);
    } else {
        // explicitly added <T> as in (some?) IntelliJ idea the line would show an error:
        // no SSL, hence no need for SSLContextCreator
        flow = http.<T>cachedHostConnectionPool(ConnectHttp.toHost(baseUri), poolSettings, log);
    }
    return flow.buffer(parallelism, OverflowStrategy.backpressure());
}
 
源代码9 项目: ditto   文件: MessageMappingProcessorActor.java
private SourceQueue<ExternalMessage> materializeInboundStream(final int processorPoolSize) {
    return Source.<ExternalMessage>queue(getBufferSize(), OverflowStrategy.dropNew())
            // parallelize potentially CPU-intensive payload mapping on this actor's dispatcher
            .mapAsync(processorPoolSize, externalMessage -> CompletableFuture.supplyAsync(
                    () -> mapInboundMessage(externalMessage),
                    getContext().getDispatcher())
            )
            .flatMapConcat(signalSource -> signalSource)
            .toMat(Sink.foreach(this::handleIncomingMappedSignal), Keep.left())
            .run(materializer);
}
 
源代码10 项目: ditto   文件: KafkaPublisherActor.java
private ActorRef createInternalKafkaProducer(final KafkaConnectionFactory factory,
        final BiFunction<Done, Throwable, Done> completionOrFailureHandler) {

    final Pair<ActorRef, CompletionStage<Done>> materializedFlowedValues =
            Source.<ProducerMessage.Envelope<String, String, PassThrough>>actorRef(100,
                    OverflowStrategy.dropHead())
                    .via(factory.newFlow())
                    .toMat(KafkaPublisherActor.publishSuccessSink(), Keep.both())
                    .run(ActorMaterializer.create(getContext()));
    materializedFlowedValues.second().handleAsync(completionOrFailureHandler);
    return materializedFlowedValues.first();
}
 
源代码11 项目: ditto   文件: HttpPushFactoryTest.java
private Pair<SourceQueueWithComplete<HttpRequest>, SinkQueueWithCancel<Try<HttpResponse>>> newSourceSinkQueues(
        final HttpPushFactory underTest) {

    return Source.<HttpRequest>queue(10, OverflowStrategy.dropNew())
            .map(r -> Pair.create(r, null))
            .viaMat(underTest.createFlow(actorSystem, actorSystem.log()), Keep.left())
            .map(Pair::first)
            .toMat(Sink.queue(), Keep.both())
            .run(mat);
}
 
源代码12 项目: ditto   文件: MongoSearchUpdaterFlowTest.java
@SuppressWarnings("unchecked")
private void testStreamRestart(final Supplier<Throwable> errorSupplier) throws Exception {

    new TestKit(actorSystem) {{

        // GIVEN: The persistence fails with an error on every write

        final MongoDatabase db = Mockito.mock(MongoDatabase.class);
        final MongoCollection<Document> collection = Mockito.mock(MongoCollection.class);
        final Publisher<BulkWriteResult> publisher = s -> s.onError(errorSupplier.get());
        Mockito.when(db.getCollection(Mockito.any())).thenReturn(collection);
        Mockito.when(collection.bulkWrite(Mockito.any(), Mockito.any(BulkWriteOptions.class)))
                .thenReturn(publisher);

        // GIVEN: MongoSearchUpdaterFlow is wrapped inside a RestartSink

        final MongoSearchUpdaterFlow flow = MongoSearchUpdaterFlow.of(db);

        final Sink<Source<AbstractWriteModel, NotUsed>, ?> sink =
                flow.start(1, 1, Duration.ZERO).to(Sink.ignore());

        final Sink<Source<AbstractWriteModel, NotUsed>, ?> restartSink =
                RestartSink.withBackoff(Duration.ZERO, Duration.ZERO, 1.0, () -> sink);

        // WHEN: Many changes stream through MongoSearchUpdaterFlow

        final int numberOfChanges = 25;
        final CountDownLatch latch = new CountDownLatch(numberOfChanges);

        final AbstractWriteModel abstractWriteModel = Mockito.mock(AbstractWriteModel.class);
        final WriteModel<Document> mongoWriteModel = new DeleteOneModel<>(new Document());
        Mockito.when(abstractWriteModel.toMongo()).thenReturn(mongoWriteModel);
        Source.repeat(Source.single(abstractWriteModel))
                .take(numberOfChanges)
                .buffer(1, OverflowStrategy.backpressure())
                .map(source -> {
                    latch.countDown();
                    return source;
                })
                .runWith(restartSink, ActorMaterializer.create(actorSystem));

        // THEN: MongoSearchUpdaterFlow should keep restarting and keep consuming changes from the stream

        latch.await(5L, TimeUnit.SECONDS);
        assertThat(latch.getCount()).isZero();
    }};
}