下面列出了com.mongodb.client.model.BulkWriteOptions#akka.stream.OverflowStrategy 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Create a flow that delays its elements with exponential backoff and reset after no element passes through the
* stream for a fixed period of time. Stream elements represent failures and are logged as errors.
* <ul>
* <li>Emits when: upstream emits.</li>
* <li>Completes when: upstream completes.</li>
* <li>Cancels when: never - this is a part of a feedback loop.</li>
* <li>Fails when: {@code maxRestarts} elements pass through the stream.</li>
* </ul>
*
* @param minBackoff Minimum backoff duration.
* @param maxBackoff Maximum backoff duration.
* @param maxRestarts Maximum number of tolerated failures. Tolerate no failure if 0. Tolerate arbitrarily many
* failures if negative.
* @param recovery The period after which backoff is reset to {@code minBackoff} if no failure occurred.
* @param <E> Type of stream elements.
* @return A delayed flow.
*/
private static <E> Flow<E, E, NotUsed> backoff(final Duration minBackoff, final Duration maxBackoff,
final int maxRestarts, final Duration recovery) {
final Flow<E, E, NotUsed> neverCancelFlowWithErrorLogging = Flow.<E>create()
.log("resume-source-errors-flow")
.withAttributes(logLevels(logLevelInfo(), logLevelDebug(), logLevelInfo()))
.via(new NeverCancelFlow<>());
final Flow<E, E, NotUsed> upstream = maxRestarts < 0
? neverCancelFlowWithErrorLogging
: maxRestarts == 0
? neverCancelFlowWithErrorLogging.flatMapConcat(ResumeSource::failWithLimitsReached)
: neverCancelFlowWithErrorLogging.limit(maxRestarts);
return upstream.statefulMapConcat(() -> new StatefulBackoffFunction<>(minBackoff, maxBackoff, recovery))
.flatMapConcat(pair ->
Source.single(pair.first())
.delay(pair.second(), OverflowStrategy.backpressure())
);
}
/**
* Create a flow from seeds to either elements or failure with reason and the previous N elements that passed
* through the stream before the failure. For maximum information, even elements emitted 2 or more failures ago
* are kept in the look-behind queue.
* <ul>
* <li>Emits when: stream created by {@code resume} emits or fails.</li>
* <li>Completes when: a stream created by {@code resume} completes.</li>
* <li>Cancels when: downstream cancels.</li>
* <li>Fails when: never.</li>
* </ul>
*
* @param resume Creator of a stream of elements from a resumption seed.
* @param lookBehind How many elements to keep in memory to create the next seed on failure.
* @param <S> Type of seeds.
* @param <E> Type of elements.
* @return A never-failing flow.
*/
private static <S, E> Flow<S, Either<FailureWithLookBehind<E>, E>, NotUsed> resumeWithFailuresAppended(
final Function<S, Source<E, ?>> resume, final int lookBehind,
final Function<Throwable, Optional<Throwable>> resumeOrMapError) {
return Flow.<S>create()
.flatMapConcat(seed -> resume.apply(seed)
.<Envelope<E>>map(Element::new)
.concat(Source.single(new EndOfStream<>()))
.recoverWithRetries(1,
new PFBuilder<Throwable, Graph<SourceShape<Envelope<E>>, NotUsed>>()
.matchAny(error -> resumeOrMapError.apply(error)
.<Source<Envelope<E>, NotUsed>>map(Source::failed)
.orElseGet(() -> Source.single(new Error<>(error)))
)
.build())
)
.via(new EndStreamOnEOS<>())
.buffer(1, OverflowStrategy.backpressure())
.statefulMapConcat(() -> new StatefulLookBehindFunction<>(lookBehind));
}
private SourceQueueWithComplete<T> getSourceQueue(final ActorMaterializer materializer) {
// Log stream completion and failure at level ERROR because the stream is supposed to survive forever.
final Attributes streamLogLevels =
Attributes.logLevels(Attributes.logLevelDebug(), Attributes.logLevelError(),
Attributes.logLevelError());
return Source.<T>queue(getBufferSize(), OverflowStrategy.dropNew())
.map(this::incrementDequeueCounter)
.log("graph-actor-stream-1-dequeued", logger)
.withAttributes(streamLogLevels)
.via(Flow.fromFunction(this::beforeProcessMessage))
.log("graph-actor-stream-2-preprocessed", logger)
.withAttributes(streamLogLevels)
.via(processMessageFlow())
.log("graph-actor-stream-3-processed", logger)
.withAttributes(streamLogLevels)
.to(processedMessageSink())
.run(materializer);
}
@SuppressWarnings("unused")
private HttpPublisherActor(final Connection connection, final HttpPushFactory factory) {
super(connection);
this.factory = factory;
final ActorSystem system = getContext().getSystem();
final ConnectionConfig connectionConfig =
DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(system.settings().config()))
.getConnectionConfig();
config = connectionConfig.getHttpPushConfig();
materializer = ActorMaterializer.create(getContext());
sourceQueue =
Source.<Pair<HttpRequest, HttpPushContext>>queue(config.getMaxQueueSize(), OverflowStrategy.dropNew())
.viaMat(factory.createFlow(system, log), Keep.left())
.toMat(Sink.foreach(this::processResponse), Keep.left())
.run(materializer);
}
public Result stream() {
Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
sourceActor.tell(ByteString.fromString("kiki"), null);
try {
MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sourceActor.tell(ByteString.fromString("foo"), null);
sourceActor.tell(ByteString.fromString("bar"), null);
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
new CreateTraceEntry().traceEntryMarker();
return null;
});
return ok().chunked(source);
}
@Lazy(false)
@Bean
MessageConsumer createAkkaMessageConsumer(Flux<String> messageFlux, ActorSystem actorSystem) {
MessageConsumer messageConsumer = new MessageConsumer("Akka");
// using Akka Streams to consume messages:
ActorMaterializer mat = ActorMaterializer.create(actorSystem);
Source.fromPublisher(messageFlux)
.buffer(100, OverflowStrategy.backpressure()) // 100 max buffer
.to(Sink.foreach(msg -> messageConsumer.accept(msg)))
.run(mat);
return messageConsumer;
}
/**
* Create a stream of snapshot revisions of all known entities.
* The stream fails if there is a failure requesting any stream or processing any stream element.
*
* @param config configuration of the persistence ID source.
* @param pubSubMediator the pub-sub mediator.
* @return source of entity IDs with revisions of their latest snapshots.
*/
public static Source<EntityIdWithRevision, NotUsed> create(final PersistenceIdsConfig config,
final ActorRef pubSubMediator) {
return Source.from(PERSISTENCE_STREAMING_ACTOR_PATHS)
.buffer(1, OverflowStrategy.backpressure())
.flatMapConcat(path -> buildResumeSource(config, pubSubMediator, path)
// recover to empty source to cleanup other resource types even on long-term failure
.recoverWithRetries(1, Throwable.class, Source::empty));
}
@Override
public <T> Flow<Pair<HttpRequest, T>, Pair<Try<HttpResponse>, T>, ?> createFlow(final ActorSystem system,
final LoggingAdapter log) {
final Http http = Http.get(system);
final ConnectionPoolSettings poolSettings = getConnectionPoolSettings(system);
final Flow<Pair<HttpRequest, T>, Pair<Try<HttpResponse>, T>, ?> flow;
if (null != httpsConnectionContext) {
final ConnectHttp connectHttpsWithCustomSSLContext =
ConnectHttp.toHostHttps(baseUri).withCustomHttpsContext(httpsConnectionContext);
// explicitly added <T> as in (some?) IntelliJ idea the line would show an error:
flow = http.<T>cachedHostConnectionPoolHttps(connectHttpsWithCustomSSLContext, poolSettings, log);
} else {
// explicitly added <T> as in (some?) IntelliJ idea the line would show an error:
// no SSL, hence no need for SSLContextCreator
flow = http.<T>cachedHostConnectionPool(ConnectHttp.toHost(baseUri), poolSettings, log);
}
return flow.buffer(parallelism, OverflowStrategy.backpressure());
}
private SourceQueue<ExternalMessage> materializeInboundStream(final int processorPoolSize) {
return Source.<ExternalMessage>queue(getBufferSize(), OverflowStrategy.dropNew())
// parallelize potentially CPU-intensive payload mapping on this actor's dispatcher
.mapAsync(processorPoolSize, externalMessage -> CompletableFuture.supplyAsync(
() -> mapInboundMessage(externalMessage),
getContext().getDispatcher())
)
.flatMapConcat(signalSource -> signalSource)
.toMat(Sink.foreach(this::handleIncomingMappedSignal), Keep.left())
.run(materializer);
}
private ActorRef createInternalKafkaProducer(final KafkaConnectionFactory factory,
final BiFunction<Done, Throwable, Done> completionOrFailureHandler) {
final Pair<ActorRef, CompletionStage<Done>> materializedFlowedValues =
Source.<ProducerMessage.Envelope<String, String, PassThrough>>actorRef(100,
OverflowStrategy.dropHead())
.via(factory.newFlow())
.toMat(KafkaPublisherActor.publishSuccessSink(), Keep.both())
.run(ActorMaterializer.create(getContext()));
materializedFlowedValues.second().handleAsync(completionOrFailureHandler);
return materializedFlowedValues.first();
}
private Pair<SourceQueueWithComplete<HttpRequest>, SinkQueueWithCancel<Try<HttpResponse>>> newSourceSinkQueues(
final HttpPushFactory underTest) {
return Source.<HttpRequest>queue(10, OverflowStrategy.dropNew())
.map(r -> Pair.create(r, null))
.viaMat(underTest.createFlow(actorSystem, actorSystem.log()), Keep.left())
.map(Pair::first)
.toMat(Sink.queue(), Keep.both())
.run(mat);
}
@SuppressWarnings("unchecked")
private void testStreamRestart(final Supplier<Throwable> errorSupplier) throws Exception {
new TestKit(actorSystem) {{
// GIVEN: The persistence fails with an error on every write
final MongoDatabase db = Mockito.mock(MongoDatabase.class);
final MongoCollection<Document> collection = Mockito.mock(MongoCollection.class);
final Publisher<BulkWriteResult> publisher = s -> s.onError(errorSupplier.get());
Mockito.when(db.getCollection(Mockito.any())).thenReturn(collection);
Mockito.when(collection.bulkWrite(Mockito.any(), Mockito.any(BulkWriteOptions.class)))
.thenReturn(publisher);
// GIVEN: MongoSearchUpdaterFlow is wrapped inside a RestartSink
final MongoSearchUpdaterFlow flow = MongoSearchUpdaterFlow.of(db);
final Sink<Source<AbstractWriteModel, NotUsed>, ?> sink =
flow.start(1, 1, Duration.ZERO).to(Sink.ignore());
final Sink<Source<AbstractWriteModel, NotUsed>, ?> restartSink =
RestartSink.withBackoff(Duration.ZERO, Duration.ZERO, 1.0, () -> sink);
// WHEN: Many changes stream through MongoSearchUpdaterFlow
final int numberOfChanges = 25;
final CountDownLatch latch = new CountDownLatch(numberOfChanges);
final AbstractWriteModel abstractWriteModel = Mockito.mock(AbstractWriteModel.class);
final WriteModel<Document> mongoWriteModel = new DeleteOneModel<>(new Document());
Mockito.when(abstractWriteModel.toMongo()).thenReturn(mongoWriteModel);
Source.repeat(Source.single(abstractWriteModel))
.take(numberOfChanges)
.buffer(1, OverflowStrategy.backpressure())
.map(source -> {
latch.countDown();
return source;
})
.runWith(restartSink, ActorMaterializer.create(actorSystem));
// THEN: MongoSearchUpdaterFlow should keep restarting and keep consuming changes from the stream
latch.await(5L, TimeUnit.SECONDS);
assertThat(latch.getCount()).isZero();
}};
}