下面列出了com.mongodb.client.model.BulkWriteOptions#akka.stream.javadsl.Sink 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test()
void properlyHandleInvalidCommandMessage() {
final TestKit kafkaProducer = new TestKit(system);
final TestKit metricsService = new TestKit(system);
final TestKit callContextProvider = new TestKit(system);
final ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("topic", 0, 0,
"key", "NOT JSON");
final Source<ConsumerRecord<String, String>, NotUsed> source = Source.single(consumerRecord);
final Sink<ProducerRecord<String, String>, NotUsed> sink = Sink.<ProducerRecord<String, String>>ignore()
.mapMaterializedValue(q -> NotUsed.getInstance());
AriCommandResponseKafkaProcessor.commandResponseProcessing()
.on(system)
.withHandler(requestAndContext -> Http.get(system).singleRequest(requestAndContext._1))
.withCallContextProvider(callContextProvider.getRef())
.withMetricsService(metricsService.getRef())
.from(source)
.to(sink)
.run();
kafkaProducer.expectNoMsg(Duration.apply(250, TimeUnit.MILLISECONDS));
}
@Test
void verifyProcessingPipelineWorksAsExpectedForBogusMessages() {
final TestKit catchAllProbe = new TestKit(system);
final Source<Message, NotUsed> source = Source.single(new Strict("invalid message from ws"));
final Sink<ProducerRecord<String, String>, NotUsed> sink = Sink.actorRef(catchAllProbe.getRef(), new ProducerRecord<String, String>("none", "completed"));
WebsocketMessageToProducerRecordTranslator.eventProcessing()
.on(system)
.withHandler(() -> catchAllProbe.getRef().tell("Application replaced", catchAllProbe.getRef()))
.withCallContextProvider(catchAllProbe.getRef())
.withMetricsService(catchAllProbe.getRef())
.from(source)
.to(sink)
.run();
final ProducerRecord<String, String> completeMsg = catchAllProbe.expectMsgClass(ProducerRecord.class);
assertThat(completeMsg.topic(), is("none"));
assertThat(completeMsg.value(), is("completed"));
}
private void startRestore() {
s3
.list(tag)
// skip over entries until the one BEFORE entry where startTime >= offset (since the one before may have been only partially restored)
.via(dropUntilNext(l -> S3.getStartInstant(l).toEpochMilli() >= offset, true))
.flatMapConcat(entry -> s3.loadEvents(entry.key().substring(entry.key().lastIndexOf("/") + 1)))
.mapAsync(maxInFlight, e -> {
log.debug("Replaying {}:{}", e.getPersistenceId(), e.getSequenceNr());
return ask(shardRegion, e, timeout);
})
.map(resp -> {
log.debug("Responded {}", resp);
return (Long) resp;
})
// only save one lastOffset update per minute, and only the lowest one
.conflate((Long l1, Long l2) -> l1 < l2 ? l1 : l2)
.runWith(Sink.actorRefWithAck(self(), "init", "ack", "done", Failure::new), materializer);
}
private CompletionStage<Done> createNonExistingIndices(final String collectionName,
final List<Index> indices) {
if (indices.isEmpty()) {
LOGGER.warn("No indices are defined, thus no indices are created.");
return CompletableFuture.completedFuture(Done.getInstance());
}
return indexOperations.getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(
existingIndices -> {
LOGGER.info("Create non-existing indices: Existing indices are: {}", existingIndices);
final List<Index> indicesToCreate = excludeIndices(indices, existingIndices);
LOGGER.info("Indices to create are: {}", indicesToCreate);
return createIndices(collectionName, indicesToCreate);
})
.runWith(Sink.ignore(), materializer);
}
private Receive startBackup(long offset) {
query
.eventsByTag(tag, NoOffset.getInstance())
// create backups of max [N] elements, or at least every [T] on activity
// FIXME write a stage that, instead of buffering each chunk into memory, creates sub-streams instead.
.groupedWithin(eventChunkSize, eventChunkDuration)
.filter(list -> list.size() > 0)
.mapAsync(4, list -> s3.store(tag, Vector.ofAll(list)).thenApply(done -> list.get(list.size() - 1).offset()))
.runWith(Sink.actorRefWithAck(self(), "init", "ack", "done", Failure::new), materializer);
return ReceiveBuilder.create()
.matchEquals("init", msg -> sender().tell("ack", self()))
.match(Long.class, l -> pipe(s3.saveOffset(l).thenApply(done -> "ack"), context().dispatcher()).to(sender()))
.match(Failure.class, msg -> {
log.error("Stream failed, rethrowing", msg.cause());
throw new RuntimeException(msg.cause());
})
.matchEquals("done", msg -> { throw new IllegalStateException("eventsByTag completed, this should not happen. Killing actor, hoping for restart"); })
.build();
}
@Test
public void extractJournalPidsFromEventsAndNotSnapshots() {
insert("test_journal", new Document().append("pid", "pid3").append("to", 2L));
insert("test_journal", new Document().append("pid", "pid4").append("to", 2L));
insert("test_journal", new Document().append("pid", "pid1").append("to", 1L));
insert("test_journal", new Document().append("pid", "pid2").append("to", 1L));
insert("test_snaps", new Document().append("pid", "pid5").append("sn", 3L));
insert("test_snaps", new Document().append("pid", "pid6").append("sn", 4L));
final List<String> pids =
readJournal.getJournalPids(2, Duration.ZERO, materializer)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();
assertThat(pids).containsExactly("pid1", "pid2", "pid3", "pid4");
}
private Route handleSudoCountThingsPerRequest(final RequestContext ctx, final SudoCountThings command) {
final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
Source.single(command)
.to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
AbstractHttpRequestActor.COMPLETE_MESSAGE))
.run(materializer);
final CompletionStage<HttpResponse> allThingsCountHttpResponse = Source.fromCompletionStage(httpResponseFuture)
.flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(Integer::valueOf)
.map(count -> JsonObject.newBuilder().set("allThingsCount", count).build())
.map(jsonObject -> HttpResponse.create()
.withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(jsonObject.toString()))
.withStatus(HttpStatusCode.OK.toInt()))
.runWith(Sink.head(), materializer);
return completeWithFuture(allThingsCountHttpResponse);
}
private void insertTestThings() {
final Thing baseThing = ThingsModelFactory.newThingBuilder()
.setId(ThingId.of("thing", "00"))
.setRevision(1234L)
.setPermissions(AUTH_CONTEXT.getFirstAuthorizationSubject().orElseThrow(AssertionError::new),
Permission.READ)
.setAttribute(JsonPointer.of("x"), JsonValue.of(5))
.build();
final Thing irrelevantThing = baseThing.toBuilder().removeAllAttributes().build();
writePersistence.writeThingWithAcl(template(baseThing, 0, "a"))
.concat(writePersistence.writeThingWithAcl(template(baseThing, 1, "b")))
.concat(writePersistence.writeThingWithAcl(template(baseThing, 2, "a")))
.concat(writePersistence.writeThingWithAcl(template(baseThing, 3, "b")))
.concat(writePersistence.writeThingWithAcl(template(baseThing, 4, "c")))
.concat(writePersistence.writeThingWithAcl(template(irrelevantThing, 5, "c")))
.runWith(Sink.ignore(), materializer)
.toCompletableFuture()
.join();
}
private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink() {
final StreamConfig streamConfig = searchConfig.getStreamConfig();
final PersistenceStreamConfig persistenceConfig = streamConfig.getPersistenceConfig();
final int parallelism = persistenceConfig.getParallelism();
final int maxBulkSize = persistenceConfig.getMaxBulkSize();
final Duration writeInterval = streamConfig.getWriteInterval();
final Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> sink =
mongoSearchUpdaterFlow.start(parallelism, maxBulkSize, writeInterval)
.via(bulkWriteResultAckFlow.start())
.log("SearchUpdaterStream/BulkWriteResult")
.withAttributes(Attributes.logLevels(
Attributes.logLevelInfo(),
Attributes.logLevelWarning(),
Attributes.logLevelError()))
.to(Sink.ignore());
final ExponentialBackOffConfig backOffConfig = persistenceConfig.getExponentialBackOffConfig();
return RestartSink.withBackoff(backOffConfig.getMax(), backOffConfig.getMax(), backOffConfig.getRandomFactor(),
() -> sink);
}
private String extractBody(HttpResponse httpResponse) {
return httpResponse.entity()
.getDataBytes()
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.runWith(Sink.head(), actorSystem)
.toCompletableFuture()
.join();
}
private static ActorMaterializer run(ActorSystem system, ActorRef callContextProvider, ActorRef metricsService,
Source<Message, NotUsed> source, Sink<ProducerRecord<String, String>, NotUsed> sink,
Runnable applicationReplacedHandler) {
final Function<Throwable, Supervision.Directive> supervisorStrategy = t -> {
system.log().error(t, t.getMessage());
return Supervision.resume();
};
final Config kafkaConfig = ConfigFactory.load().getConfig(SERVICE).getConfig(KAFKA);
final String commandsTopic = kafkaConfig.getString(COMMANDS_TOPIC);
final String eventsAndResponsesTopic = kafkaConfig.getString(EVENTS_AND_RESPONSES_TOPIC);
final ActorMaterializer materializer = ActorMaterializer.create(
ActorMaterializerSettings.create(system).withSupervisionStrategy(supervisorStrategy),
system);
source
//.throttle(4 * 13, Duration.ofSeconds(1)) // Note: We die right now for calls/s >= 4.8
.wireTap(Sink.foreach(msg -> gatherMetrics(msg, metricsService, callContextProvider)))
.flatMapConcat((msg) -> generateProducerRecordFromEvent(commandsTopic, eventsAndResponsesTopic, msg, callContextProvider, system.log(),
applicationReplacedHandler))
.log(">>> ARI EVENT", record -> record.value()).withAttributes(LOG_LEVELS)
.to(sink)
.run(materializer);
return materializer;
}
private static ActorMaterializer runAriCommandResponseProcessor(
Config kafkaConfig,
ActorSystem system,
ActorRef callContextProvider,
ActorRef metricsService) {
final ConsumerSettings<String, String> consumerSettings = ConsumerSettings
.create(system, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS))
.withGroupId(kafkaConfig.getString(CONSUMER_GROUP))
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final ProducerSettings<String, String> producerSettings = ProducerSettings
.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS));
final Source<ConsumerRecord<String, String>, NotUsed> source = RestartSource.withBackoff(
Duration.of(5, ChronoUnit.SECONDS),
Duration.of(10, ChronoUnit.SECONDS),
0.2,
() -> Consumer
.plainSource(consumerSettings, Subscriptions.topics(kafkaConfig.getString(COMMANDS_TOPIC)))
.mapMaterializedValue(control -> NotUsed.getInstance())
);
final Sink<ProducerRecord<String, String>, NotUsed> sink = Producer
.plainSink(producerSettings)
.mapMaterializedValue(done -> NotUsed.getInstance());
return AriCommandResponseKafkaProcessor.commandResponseProcessing()
.on(system)
.withHandler(requestAndContext -> Http.get(system).singleRequest(requestAndContext._1))
.withCallContextProvider(callContextProvider)
.withMetricsService(metricsService)
.from(source)
.to(sink)
.run();
}
private String runBulkWriteResultAckFlowAndGetFirstLogEntry(final WriteResultAndErrors writeResultAndErrors) {
return Source.single(writeResultAndErrors)
.via(underTest.start())
.runWith(Sink.head(), materializer)
.toCompletableFuture()
.join();
}
@DisplayName("Verify processing of both channel and playback events results in the expected kafka producer record")
@ParameterizedTest
@ValueSource(strings = { stasisStartEvent, playbackFinishedEvent, recordingFinishedEvent })
void generateProducerRecordFromAllAriMessageTypes(String ariEvent) {
new TestKit(system)
{
{
final Future<Source<ProducerRecord<String, String>, NotUsed>> wsToKafkaProcessor = Future.of(
() -> AriEventProcessing
.generateProducerRecordFromEvent(fakeCommandsTopic, fakeEventsAndResponsesTopic, new Strict(ariEvent), getRef(), system.log(), genApplicationReplacedHandler.apply(getRef()))
);
expectMsgClass(ProvideCallContext.class);
reply(new CallContextProvided("CALL_CONTEXT"));
final ProducerRecord<String, String> record = wsToKafkaProcessor
.flatMap(source -> Future.fromCompletableFuture(source.runWith(
Sink.last(),
ActorMaterializer.create(ActorMaterializerSettings.create(system), system))
.toCompletableFuture())
)
.await()
.get();
assertThat(record.key(), is("CALL_CONTEXT"));
assertThat(record.topic(), is(fakeEventsAndResponsesTopic));
}
};
}
Publisher<HashTag> hashtags() {
return Source
.fromPublisher(this.tweets())
.map(Tweet::getHashTags)
.reduce((a, b) -> {
Set<HashTag> tags = new HashSet<>();
tags.addAll(a);
tags.addAll(b);
return tags;
})
.mapConcat(param -> param)
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), this.actorMaterializer);
}
private void dumpSourceToStdOut(Source<?,NotUsed> src) throws InterruptedException, ExecutionException {
final ActorSystem system = ActorSystem.create("QuickStart");
final Materializer materializer = ActorMaterializer.create(system);
final CompletionStage<Done> done = src.runWith(Sink.foreach(a -> System.out.println(a)),materializer);
done.thenRun(()->system.terminate());
// Make it happen
done.toCompletableFuture().get();
}
private String dumpSourceToString(Source<?,NotUsed> f) throws InterruptedException, ExecutionException {
final ActorSystem system = ActorSystem.create("QuickStart");
final Materializer materializer = ActorMaterializer.create(system);
StringBuilder s = new StringBuilder();
final CompletionStage<Done> done = f.runWith(Sink.foreach(a -> s.append(a)),materializer);
done.thenRun(()->system.terminate());
done.toCompletableFuture().get();
return s.toString();
}
/**
* Materializes the given source and waits for it to successfully emit one element. It then completes the returned
* CompletionStage with the full stream. It will wait indefinitely for that first element, so timeouts will have to be handled
* separately on the stream, returned future, or both.
*
* This is useful in cases where you want to "fail early" when handling a stream result. For example, you might want
* to build an http response based on a stream, but want to set a different status code if the stream fails
* to emit any element.
*/
public static <T> CompletionStage<Source<T,NotUsed>> awaitOne(Source<T,?> source, Materializer materializer) {
return source.prefixAndTail(1).map(pair -> {
if (pair.first().isEmpty()) {
return pair.second();
} else {
T head = pair.first().get(0);
Source<T, NotUsed> tail = pair.second();
return Source.single(head).concat(tail);
}
}).runWith(Sink.head(), materializer);
}
@Test
public void test_a_source() {
Sink<Object, TestSubscriber.Probe<Object>> sink = TestSink.probe(system);
Source<Object, NotUsed> sourceUnderTest = Source.single("test");
sourceUnderTest.runWith(sink, materializer)
.request(1)
.expectNext("test")
.expectComplete();
}
public WebSocket ws() {
return WebSocket.Text.acceptOrResult(req -> {
return wireTransferService
.transferStream()
.invoke()
.thenApply(source -> {
return F.Either.Right(Flow.fromSinkAndSource(Sink.ignore(), source));
});
});
}
public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
MetricsService metricsService, String bootstrap_servers, String topic) {
super(actorSystem, sagaShardRegionActor, metricsService);
// init consumer
final Materializer materializer = ActorMaterializer.create(actorSystem);
final Config consumerConfig = actorSystem.settings().config().getConfig("akka.kafka.consumer");
final ConsumerSettings<String, String> consumerSettings =
ConsumerSettings
.create(consumerConfig, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(bootstrap_servers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class")
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class");
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(20, event -> {
BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class);
if (LOG.isDebugEnabled()) {
LOG.debug("receive [{}] {} {}", bean.getGlobalTxId(), bean.getType(), bean.getLocalTxId());
}
return sendSagaActor(bean).thenApply(done -> event.committableOffset());
})
.batch(
100,
ConsumerMessage::createCommittableOffsetBatch,
ConsumerMessage.CommittableOffsetBatch::updated
)
.mapAsync(20, offset -> offset.commitJavadsl())
.to(Sink.ignore())
.run(materializer);
}
public void cancelStreamedMessage(Object msg) {
if (msg instanceof StreamedHttpMessage) {
Source.fromPublisher((StreamedHttpMessage) msg).runWith(Sink.<HttpContent>cancelled(), materializer);
} else {
throw new IllegalArgumentException("Unknown message type: " + msg);
}
}
private static Sink<ImmutableDispatch, ?> searchActorSink(final ActorRef pubSubMediator,
final PreEnforcer preEnforcer) {
return Sink.foreach(dispatchToPreEnforce ->
preEnforce(dispatchToPreEnforce, preEnforcer, dispatch ->
pubSubMediator.tell(
DistPubSubAccess.send(SEARCH_ACTOR_PATH, dispatch.getMessage()),
dispatch.getSender())
)
);
}
@Test
public void givenMultigigabyteResponseConsumeWithStreams() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);
final Path path = Files.createTempFile("tmp_", ".out");
WSClient ws = play.test.WSTestClient.newClient(port);
log.info("Starting test server on url: " + url);
ws.url(url)
.stream()
.thenAccept(
response -> {
try {
OutputStream outputStream = java.nio.file.Files.newOutputStream(path);
Sink<ByteString, CompletionStage<Done>> outputWriter =
Sink.foreach(bytes -> {
log.info("Reponse: " + bytes.utf8String());
outputStream.write(bytes.toArray());
});
response.getBodyAsSource()
.runWith(outputWriter, materializer);
} catch (IOException e) {
log.error("An error happened while opening the output stream", e);
}
})
.whenComplete((value, error) -> latch.countDown());
log.debug(
"Waiting for requests to be completed. Current Time: " + System.currentTimeMillis());
latch.await(5, TimeUnit.SECONDS );
assertEquals(0, latch.getCount());
log.debug("All requests have been completed. Exiting test.");
}
/**
* Convert this enforcement provider into a stream of enforcement tasks.
*
* @param preEnforcer failable future to execute before the actual enforcement.
* @return the stream.
*/
@SuppressWarnings("unchecked") // due to GraphDSL usage
default Graph<FlowShape<Contextual<WithDittoHeaders>, EnforcementTask>, NotUsed> createEnforcementTask(
final PreEnforcer preEnforcer
) {
final Graph<FanOutShape2<Contextual<WithDittoHeaders>, Contextual<T>, Contextual<WithDittoHeaders>>, NotUsed>
multiplexer = Filter.multiplexBy(contextual -> contextual.tryToMapMessage(this::mapToHandledClass));
return GraphDSL.create(builder -> {
final FanOutShape2<Contextual<WithDittoHeaders>, Contextual<T>, Contextual<WithDittoHeaders>> fanout =
builder.add(multiplexer);
final Flow<Contextual<T>, EnforcementTask, NotUsed> enforcementFlow =
Flow.fromFunction(contextual -> buildEnforcementTask(contextual, preEnforcer));
// by default, ignore unhandled messages:
final SinkShape<Contextual<WithDittoHeaders>> unhandledSink = builder.add(Sink.ignore());
final FlowShape<Contextual<T>, EnforcementTask> enforcementShape = builder.add(enforcementFlow);
builder.from(fanout.out0()).toInlet(enforcementShape.in());
builder.from(fanout.out1()).to(unhandledSink);
return FlowShape.of(fanout.in(), enforcementShape.out());
});
}
private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
return getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(existingIndices -> {
LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
return dropIndices(collectionName, indicesToDrop);
})
.runWith(Sink.ignore(), materializer);
}
/**
* Loads the last known written offset from S3, or returns 0 if not found
*/
public CompletionStage<Long> loadOffset() {
return download("_lastOffset")
.reduce((bs1, bs2) -> bs1.concat(bs2))
.map(bs -> Long.parseLong(bs.utf8String()))
.recoverWith(new PFBuilder<Throwable, Source<Long,NotUsed>>()
.matchAny(x -> Source.single(0L)) // not found -> start at 0
.build()
)
.runWith(Sink.head(), materializer);
}
public StreamedHttpResponse createStreamedResponse(HttpVersion version, List<String> body, long contentLength) {
List<HttpContent> content = new ArrayList<>();
for (String chunk: body) {
content.add(new DefaultHttpContent(Unpooled.copiedBuffer(chunk, Charset.forName("utf-8"))));
}
Publisher<HttpContent> publisher = Source.from(content).runWith(Sink.<HttpContent>asPublisher(AsPublisher.WITH_FANOUT), materializer);
StreamedHttpResponse response = new DefaultStreamedHttpResponse(version, HttpResponseStatus.OK, publisher);
HttpUtil.setContentLength(response, contentLength);
return response;
}
@Override
public Removable addUpstream(Source<Object, NotUsed> publisher) {
UniqueKillSwitch killSwitch =
publisher.viaMat(busFlow, Keep.right())
.to(Sink.ignore())
.run(mat);
subscriptions.put(publisher, killSwitch);
return () -> AkkaHubProxy.this.removeUpstream(publisher);
}
@Test
public void ensureCollectionIsCapped() throws Exception {
final MongoCollection<Document> collection =
syncPersistence.getCollection().runWith(Sink.head(), materializer).toCompletableFuture().get();
runBlocking(syncPersistence.setTimestamp(Instant.now()));
runBlocking(syncPersistence.setTimestamp(Instant.now()));
assertThat(runBlocking(Source.fromPublisher(collection.count()))).containsExactly(1L);
}