com.mongodb.client.model.BulkWriteOptions#akka.stream.javadsl.Sink源码实例Demo

下面列出了com.mongodb.client.model.BulkWriteOptions#akka.stream.javadsl.Sink 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test()
void properlyHandleInvalidCommandMessage() {
	final TestKit kafkaProducer = new TestKit(system);
	final TestKit metricsService = new TestKit(system);
	final TestKit callContextProvider = new TestKit(system);

	final ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("topic", 0, 0,
			"key", "NOT JSON");
	final Source<ConsumerRecord<String, String>, NotUsed> source = Source.single(consumerRecord);
	final Sink<ProducerRecord<String, String>, NotUsed> sink = Sink.<ProducerRecord<String, String>>ignore()
			.mapMaterializedValue(q -> NotUsed.getInstance());

	AriCommandResponseKafkaProcessor.commandResponseProcessing()
			.on(system)
			.withHandler(requestAndContext -> Http.get(system).singleRequest(requestAndContext._1))
			.withCallContextProvider(callContextProvider.getRef())
			.withMetricsService(metricsService.getRef())
			.from(source)
			.to(sink)
			.run();

	kafkaProducer.expectNoMsg(Duration.apply(250, TimeUnit.MILLISECONDS));
}
 
@Test
void verifyProcessingPipelineWorksAsExpectedForBogusMessages() {

	final TestKit catchAllProbe = new TestKit(system);

	final Source<Message, NotUsed> source = Source.single(new Strict("invalid message from ws"));
	final Sink<ProducerRecord<String, String>, NotUsed> sink = Sink.actorRef(catchAllProbe.getRef(), new ProducerRecord<String, String>("none", "completed"));

	WebsocketMessageToProducerRecordTranslator.eventProcessing()
			.on(system)
			.withHandler(() -> catchAllProbe.getRef().tell("Application replaced", catchAllProbe.getRef()))
			.withCallContextProvider(catchAllProbe.getRef())
			.withMetricsService(catchAllProbe.getRef())
			.from(source)
			.to(sink)
			.run();

	final ProducerRecord<String, String> completeMsg = catchAllProbe.expectMsgClass(ProducerRecord.class);
	assertThat(completeMsg.topic(), is("none"));
	assertThat(completeMsg.value(), is("completed"));
}
 
源代码3 项目: ts-reaktive   文件: S3Restore.java
private void startRestore() {
    s3
    .list(tag)
    // skip over entries until the one BEFORE entry where startTime >= offset (since the one before may have been only partially restored)
    .via(dropUntilNext(l -> S3.getStartInstant(l).toEpochMilli() >= offset, true))
    .flatMapConcat(entry -> s3.loadEvents(entry.key().substring(entry.key().lastIndexOf("/") + 1)))
    .mapAsync(maxInFlight, e -> {
        log.debug("Replaying {}:{}", e.getPersistenceId(), e.getSequenceNr());
        return ask(shardRegion, e, timeout);
    })
    .map(resp -> {
        log.debug("Responded {}", resp);
        return (Long) resp;
    })
    // only save one lastOffset update per minute, and only the lowest one
    .conflate((Long l1, Long l2) -> l1 < l2 ? l1 : l2)
    .runWith(Sink.actorRefWithAck(self(), "init", "ack", "done", Failure::new), materializer);
}
 
源代码4 项目: ditto   文件: IndexInitializer.java
private CompletionStage<Done> createNonExistingIndices(final String collectionName,
        final List<Index> indices) {
    if (indices.isEmpty()) {
        LOGGER.warn("No indices are defined, thus no indices are created.");
        return CompletableFuture.completedFuture(Done.getInstance());
    }
    return indexOperations.getIndicesExceptDefaultIndex(collectionName)
            .flatMapConcat(
                    existingIndices -> {
                        LOGGER.info("Create non-existing indices: Existing indices are: {}", existingIndices);
                        final List<Index> indicesToCreate = excludeIndices(indices, existingIndices);
                        LOGGER.info("Indices to create are: {}", indicesToCreate);
                        return createIndices(collectionName, indicesToCreate);
                    })
            .runWith(Sink.ignore(), materializer);
}
 
源代码5 项目: ts-reaktive   文件: S3Backup.java
private Receive startBackup(long offset) {
    query
        .eventsByTag(tag, NoOffset.getInstance())
        // create backups of max [N] elements, or at least every [T] on activity
        // FIXME write a stage that, instead of buffering each chunk into memory, creates sub-streams instead.
        .groupedWithin(eventChunkSize, eventChunkDuration)
        .filter(list -> list.size() > 0)
        .mapAsync(4, list -> s3.store(tag, Vector.ofAll(list)).thenApply(done -> list.get(list.size() - 1).offset()))
        .runWith(Sink.actorRefWithAck(self(), "init", "ack", "done", Failure::new), materializer);
    
    return ReceiveBuilder.create()
        .matchEquals("init", msg -> sender().tell("ack", self()))
        .match(Long.class, l -> pipe(s3.saveOffset(l).thenApply(done -> "ack"), context().dispatcher()).to(sender()))
        .match(Failure.class, msg -> {
            log.error("Stream failed, rethrowing", msg.cause());
            throw new RuntimeException(msg.cause());
        })
        .matchEquals("done", msg -> { throw new IllegalStateException("eventsByTag completed, this should not happen. Killing actor, hoping for restart"); })
        .build();
}
 
源代码6 项目: ditto   文件: MongoReadJournalIT.java
@Test
public void extractJournalPidsFromEventsAndNotSnapshots() {
    insert("test_journal", new Document().append("pid", "pid3").append("to", 2L));
    insert("test_journal", new Document().append("pid", "pid4").append("to", 2L));
    insert("test_journal", new Document().append("pid", "pid1").append("to", 1L));
    insert("test_journal", new Document().append("pid", "pid2").append("to", 1L));
    insert("test_snaps", new Document().append("pid", "pid5").append("sn", 3L));
    insert("test_snaps", new Document().append("pid", "pid6").append("sn", 4L));

    final List<String> pids =
            readJournal.getJournalPids(2, Duration.ZERO, materializer)
                    .runWith(Sink.seq(), materializer)
                    .toCompletableFuture().join();

    assertThat(pids).containsExactly("pid1", "pid2", "pid3", "pid4");
}
 
源代码7 项目: ditto   文件: StatsRoute.java
private Route handleSudoCountThingsPerRequest(final RequestContext ctx, final SudoCountThings command) {
    final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();

    Source.single(command)
            .to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
                    AbstractHttpRequestActor.COMPLETE_MESSAGE))
            .run(materializer);

    final CompletionStage<HttpResponse> allThingsCountHttpResponse = Source.fromCompletionStage(httpResponseFuture)
            .flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
            .fold(ByteString.empty(), ByteString::concat)
            .map(ByteString::utf8String)
            .map(Integer::valueOf)
            .map(count -> JsonObject.newBuilder().set("allThingsCount", count).build())
            .map(jsonObject -> HttpResponse.create()
                    .withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(jsonObject.toString()))
                    .withStatus(HttpStatusCode.OK.toInt()))
            .runWith(Sink.head(), materializer);

    return completeWithFuture(allThingsCountHttpResponse);
}
 
源代码8 项目: ditto   文件: SearchActorIT.java
private void insertTestThings() {
    final Thing baseThing = ThingsModelFactory.newThingBuilder()
            .setId(ThingId.of("thing", "00"))
            .setRevision(1234L)
            .setPermissions(AUTH_CONTEXT.getFirstAuthorizationSubject().orElseThrow(AssertionError::new),
                    Permission.READ)
            .setAttribute(JsonPointer.of("x"), JsonValue.of(5))
            .build();

    final Thing irrelevantThing = baseThing.toBuilder().removeAllAttributes().build();

    writePersistence.writeThingWithAcl(template(baseThing, 0, "a"))
            .concat(writePersistence.writeThingWithAcl(template(baseThing, 1, "b")))
            .concat(writePersistence.writeThingWithAcl(template(baseThing, 2, "a")))
            .concat(writePersistence.writeThingWithAcl(template(baseThing, 3, "b")))
            .concat(writePersistence.writeThingWithAcl(template(baseThing, 4, "c")))
            .concat(writePersistence.writeThingWithAcl(template(irrelevantThing, 5, "c")))
            .runWith(Sink.ignore(), materializer)
            .toCompletableFuture()
            .join();
}
 
源代码9 项目: ditto   文件: SearchUpdaterStream.java
private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink() {
    final StreamConfig streamConfig = searchConfig.getStreamConfig();
    final PersistenceStreamConfig persistenceConfig = streamConfig.getPersistenceConfig();

    final int parallelism = persistenceConfig.getParallelism();
    final int maxBulkSize = persistenceConfig.getMaxBulkSize();
    final Duration writeInterval = streamConfig.getWriteInterval();
    final Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> sink =
            mongoSearchUpdaterFlow.start(parallelism, maxBulkSize, writeInterval)
                    .via(bulkWriteResultAckFlow.start())
                    .log("SearchUpdaterStream/BulkWriteResult")
                    .withAttributes(Attributes.logLevels(
                            Attributes.logLevelInfo(),
                            Attributes.logLevelWarning(),
                            Attributes.logLevelError()))
                    .to(Sink.ignore());

    final ExponentialBackOffConfig backOffConfig = persistenceConfig.getExponentialBackOffConfig();

    return RestartSink.withBackoff(backOffConfig.getMax(), backOffConfig.getMax(), backOffConfig.getRandomFactor(),
            () -> sink);
}
 
源代码10 项目: mutual-tls-ssl   文件: AkkaHttpClientService.java
private String extractBody(HttpResponse httpResponse) {
    return httpResponse.entity()
            .getDataBytes()
            .fold(ByteString.empty(), ByteString::concat)
            .map(ByteString::utf8String)
            .runWith(Sink.head(), actorSystem)
            .toCompletableFuture()
            .join();
}
 
private static ActorMaterializer run(ActorSystem system, ActorRef callContextProvider, ActorRef metricsService,
		Source<Message, NotUsed> source, Sink<ProducerRecord<String, String>, NotUsed> sink,
		Runnable applicationReplacedHandler) {
	final Function<Throwable, Supervision.Directive> supervisorStrategy = t -> {
		system.log().error(t, t.getMessage());
		return Supervision.resume();
	};

	final Config kafkaConfig = ConfigFactory.load().getConfig(SERVICE).getConfig(KAFKA);
	final String commandsTopic = kafkaConfig.getString(COMMANDS_TOPIC);
	final String eventsAndResponsesTopic = kafkaConfig.getString(EVENTS_AND_RESPONSES_TOPIC);

	final ActorMaterializer materializer = ActorMaterializer.create(
			ActorMaterializerSettings.create(system).withSupervisionStrategy(supervisorStrategy),
			system);

	source
			//.throttle(4 * 13, Duration.ofSeconds(1)) // Note: We die right now for calls/s >= 4.8
			.wireTap(Sink.foreach(msg -> gatherMetrics(msg, metricsService, callContextProvider)))
			.flatMapConcat((msg) -> generateProducerRecordFromEvent(commandsTopic, eventsAndResponsesTopic, msg, callContextProvider, system.log(),
					applicationReplacedHandler))
			.log(">>>   ARI EVENT", record -> record.value()).withAttributes(LOG_LEVELS)
			.to(sink)
			.run(materializer);

	return materializer;
}
 
源代码12 项目: ari-proxy   文件: Main.java
private static ActorMaterializer runAriCommandResponseProcessor(
		Config kafkaConfig,
		ActorSystem system,
		ActorRef callContextProvider,
		ActorRef metricsService) {
	final ConsumerSettings<String, String> consumerSettings = ConsumerSettings
			.create(system, new StringDeserializer(), new StringDeserializer())
			.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS))
			.withGroupId(kafkaConfig.getString(CONSUMER_GROUP))
			.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
			.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

	final ProducerSettings<String, String> producerSettings = ProducerSettings
			.create(system, new StringSerializer(), new StringSerializer())
			.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS));

	final Source<ConsumerRecord<String, String>, NotUsed> source = RestartSource.withBackoff(
			Duration.of(5, ChronoUnit.SECONDS),
			Duration.of(10, ChronoUnit.SECONDS),
			0.2,
			() -> Consumer
					.plainSource(consumerSettings, Subscriptions.topics(kafkaConfig.getString(COMMANDS_TOPIC)))
					.mapMaterializedValue(control -> NotUsed.getInstance())
	);

	final Sink<ProducerRecord<String, String>, NotUsed> sink = Producer
			.plainSink(producerSettings)
			.mapMaterializedValue(done -> NotUsed.getInstance());

	return AriCommandResponseKafkaProcessor.commandResponseProcessing()
			.on(system)
			.withHandler(requestAndContext -> Http.get(system).singleRequest(requestAndContext._1))
			.withCallContextProvider(callContextProvider)
			.withMetricsService(metricsService)
			.from(source)
			.to(sink)
			.run();
}
 
源代码13 项目: ditto   文件: BulkWriteResultAckFlowTest.java
private String runBulkWriteResultAckFlowAndGetFirstLogEntry(final WriteResultAndErrors writeResultAndErrors) {
    return Source.single(writeResultAndErrors)
            .via(underTest.start())
            .runWith(Sink.head(), materializer)
            .toCompletableFuture()
            .join();
}
 
源代码14 项目: ari-proxy   文件: AriEventProcessingTest.java
@DisplayName("Verify processing of both channel and playback events results in the expected kafka producer record")
@ParameterizedTest
@ValueSource(strings = { stasisStartEvent, playbackFinishedEvent, recordingFinishedEvent })
void generateProducerRecordFromAllAriMessageTypes(String ariEvent) {
	new TestKit(system)
	{
		{
			final Future<Source<ProducerRecord<String, String>, NotUsed>> wsToKafkaProcessor = Future.of(
					() -> AriEventProcessing
							.generateProducerRecordFromEvent(fakeCommandsTopic, fakeEventsAndResponsesTopic, new Strict(ariEvent), getRef(), system.log(), genApplicationReplacedHandler.apply(getRef()))
			);

			expectMsgClass(ProvideCallContext.class);
			reply(new CallContextProvided("CALL_CONTEXT"));

			final ProducerRecord<String, String> record = wsToKafkaProcessor
					.flatMap(source -> Future.fromCompletableFuture(source.runWith(
							Sink.last(),
							ActorMaterializer.create(ActorMaterializerSettings.create(system), system))
							.toCompletableFuture())
					)
					.await()
					.get();

			assertThat(record.key(), is("CALL_CONTEXT"));
			assertThat(record.topic(), is(fakeEventsAndResponsesTopic));
		}
	};
}
 
Publisher<HashTag> hashtags() {
		return Source
			.fromPublisher(this.tweets())
			.map(Tweet::getHashTags)
			.reduce((a, b) -> {
					Set<HashTag> tags = new HashSet<>();
					tags.addAll(a);
					tags.addAll(b);
					return tags;
			})
			.mapConcat(param -> param)
			.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), this.actorMaterializer);
}
 
源代码16 项目: reactive-code-workshop   文件: AkkaApplication.java
private void dumpSourceToStdOut(Source<?,NotUsed> src) throws InterruptedException, ExecutionException {
    final ActorSystem system = ActorSystem.create("QuickStart");
    final Materializer materializer = ActorMaterializer.create(system);

    final CompletionStage<Done> done = src.runWith(Sink.foreach(a -> System.out.println(a)),materializer);
    done.thenRun(()->system.terminate());

    // Make it happen
    done.toCompletableFuture().get();
}
 
private String dumpSourceToString(Source<?,NotUsed> f) throws InterruptedException, ExecutionException {
    final ActorSystem system = ActorSystem.create("QuickStart");
    final Materializer materializer = ActorMaterializer.create(system);

    StringBuilder s = new StringBuilder();
    final CompletionStage<Done> done = f.runWith(Sink.foreach(a -> s.append(a)),materializer);

    done.thenRun(()->system.terminate());
    done.toCompletableFuture().get();

    return s.toString();
}
 
源代码18 项目: ts-reaktive   文件: AkkaStreams.java
/**
 * Materializes the given source and waits for it to successfully emit one element. It then completes the returned
 * CompletionStage with the full stream. It will wait indefinitely for that first element, so timeouts will have to be handled
 * separately on the stream, returned future, or both.
 * 
 * This is useful in cases where you want to "fail early" when handling a stream result. For example, you might want
 * to build an http response based on a stream, but want to set a different status code if the stream fails
 * to emit any element.
 */
public static <T> CompletionStage<Source<T,NotUsed>> awaitOne(Source<T,?> source, Materializer materializer) {
    return source.prefixAndTail(1).map(pair -> {
        if (pair.first().isEmpty()) {
            return pair.second();
        } else {
            T head = pair.first().get(0);
            Source<T, NotUsed> tail = pair.second();
            return Source.single(head).concat(tail);                
        }
    }).runWith(Sink.head(), materializer);
}
 
@Test
public void test_a_source() {
    Sink<Object, TestSubscriber.Probe<Object>> sink = TestSink.probe(system);
    Source<Object, NotUsed> sourceUnderTest = Source.single("test");

    sourceUnderTest.runWith(sink, materializer)
            .request(1)
            .expectNext("test")
            .expectComplete();
}
 
public WebSocket ws() {
    return WebSocket.Text.acceptOrResult(req -> {
        return wireTransferService
            .transferStream()
            .invoke()
            .thenApply(source -> {
                return F.Either.Right(Flow.fromSinkAndSource(Sink.ignore(), source));
            });
    });
}
 
源代码21 项目: servicecomb-pack   文件: KafkaSagaEventConsumer.java
public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
    MetricsService metricsService, String bootstrap_servers, String topic) {
  super(actorSystem, sagaShardRegionActor, metricsService);


  // init consumer
  final Materializer materializer = ActorMaterializer.create(actorSystem);
  final Config consumerConfig = actorSystem.settings().config().getConfig("akka.kafka.consumer");
  final ConsumerSettings<String, String> consumerSettings =
      ConsumerSettings
          .create(consumerConfig, new StringDeserializer(), new StringDeserializer())
          .withBootstrapServers(bootstrap_servers)
          .withGroupId(groupId)
          .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
          .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
          .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class")
          .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class");
  Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(20, event -> {
        BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class);
        if (LOG.isDebugEnabled()) {
          LOG.debug("receive [{}] {} {}", bean.getGlobalTxId(), bean.getType(), bean.getLocalTxId());
        }
        return sendSagaActor(bean).thenApply(done -> event.committableOffset());
      })
      .batch(
          100,
          ConsumerMessage::createCommittableOffsetBatch,
          ConsumerMessage.CommittableOffsetBatch::updated
      )
      .mapAsync(20, offset -> offset.commitJavadsl())
      .to(Sink.ignore())
      .run(materializer);
}
 
源代码22 项目: netty-reactive-streams   文件: HttpHelper.java
public void cancelStreamedMessage(Object msg) {
    if (msg instanceof StreamedHttpMessage) {
        Source.fromPublisher((StreamedHttpMessage) msg).runWith(Sink.<HttpContent>cancelled(), materializer);
    } else {
        throw new IllegalArgumentException("Unknown message type: " + msg);
    }
}
 
源代码23 项目: ditto   文件: DispatcherActor.java
private static Sink<ImmutableDispatch, ?> searchActorSink(final ActorRef pubSubMediator,
        final PreEnforcer preEnforcer) {
    return Sink.foreach(dispatchToPreEnforce ->
            preEnforce(dispatchToPreEnforce, preEnforcer, dispatch ->
                    pubSubMediator.tell(
                            DistPubSubAccess.send(SEARCH_ACTOR_PATH, dispatch.getMessage()),
                            dispatch.getSender())
            )
    );
}
 
源代码24 项目: tutorials   文件: HomeControllerTest.java
@Test
public void givenMultigigabyteResponseConsumeWithStreams() throws Exception {
    CountDownLatch latch = new CountDownLatch(1);
    final ActorSystem system = ActorSystem.create();
    final ActorMaterializer materializer = ActorMaterializer.create(system);
    final Path path = Files.createTempFile("tmp_", ".out");

    WSClient ws = play.test.WSTestClient.newClient(port);
    log.info("Starting test server on url: " + url);
    ws.url(url)
      .stream()
      .thenAccept(
        response -> {
            try {
                OutputStream outputStream = java.nio.file.Files.newOutputStream(path);
                Sink<ByteString, CompletionStage<Done>> outputWriter =
                  Sink.foreach(bytes -> {
                      log.info("Reponse: " + bytes.utf8String());
                      outputStream.write(bytes.toArray());
                  });

                response.getBodyAsSource()
                        .runWith(outputWriter, materializer);

            } catch (IOException e) {
                log.error("An error happened while opening the output stream", e);
            }
        })
      .whenComplete((value, error) -> latch.countDown());

    log.debug(
      "Waiting for requests to be completed. Current Time: " + System.currentTimeMillis());
    latch.await(5, TimeUnit.SECONDS );
    assertEquals(0, latch.getCount());
    log.debug("All requests have been completed. Exiting test.");
}
 
源代码25 项目: ditto   文件: EnforcementProvider.java
/**
 * Convert this enforcement provider into a stream of enforcement tasks.
 *
 * @param preEnforcer failable future to execute before the actual enforcement.
 * @return the stream.
 */
@SuppressWarnings("unchecked") // due to GraphDSL usage
default Graph<FlowShape<Contextual<WithDittoHeaders>, EnforcementTask>, NotUsed> createEnforcementTask(
        final PreEnforcer preEnforcer
) {

    final Graph<FanOutShape2<Contextual<WithDittoHeaders>, Contextual<T>, Contextual<WithDittoHeaders>>, NotUsed>
            multiplexer = Filter.multiplexBy(contextual -> contextual.tryToMapMessage(this::mapToHandledClass));

    return GraphDSL.create(builder -> {
        final FanOutShape2<Contextual<WithDittoHeaders>, Contextual<T>, Contextual<WithDittoHeaders>> fanout =
                builder.add(multiplexer);

        final Flow<Contextual<T>, EnforcementTask, NotUsed> enforcementFlow =
                Flow.fromFunction(contextual -> buildEnforcementTask(contextual, preEnforcer));

        // by default, ignore unhandled messages:
        final SinkShape<Contextual<WithDittoHeaders>> unhandledSink = builder.add(Sink.ignore());

        final FlowShape<Contextual<T>, EnforcementTask> enforcementShape = builder.add(enforcementFlow);

        builder.from(fanout.out0()).toInlet(enforcementShape.in());
        builder.from(fanout.out1()).to(unhandledSink);

        return FlowShape.of(fanout.in(), enforcementShape.out());
    });
}
 
源代码26 项目: ditto   文件: IndexInitializer.java
private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
    return getIndicesExceptDefaultIndex(collectionName)
            .flatMapConcat(existingIndices -> {
                LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
                final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
                LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
                return dropIndices(collectionName, indicesToDrop);
            })
            .runWith(Sink.ignore(), materializer);
}
 
源代码27 项目: ts-reaktive   文件: S3.java
/**
  * Loads the last known written offset from S3, or returns 0 if not found
  */
 public CompletionStage<Long> loadOffset() {
     return download("_lastOffset")
 		.reduce((bs1, bs2) -> bs1.concat(bs2))
 		.map(bs -> Long.parseLong(bs.utf8String()))
 		.recoverWith(new PFBuilder<Throwable, Source<Long,NotUsed>>()
 			.matchAny(x -> Source.single(0L)) // not found -> start at 0
	.build()
)
 		.runWith(Sink.head(), materializer);
 }
 
源代码28 项目: netty-reactive-streams   文件: HttpHelper.java
public StreamedHttpResponse createStreamedResponse(HttpVersion version, List<String> body, long contentLength) {
    List<HttpContent> content = new ArrayList<>();
    for (String chunk: body) {
        content.add(new DefaultHttpContent(Unpooled.copiedBuffer(chunk, Charset.forName("utf-8"))));
    }
    Publisher<HttpContent> publisher = Source.from(content).runWith(Sink.<HttpContent>asPublisher(AsPublisher.WITH_FANOUT), materializer);
    StreamedHttpResponse response = new DefaultStreamedHttpResponse(version, HttpResponseStatus.OK, publisher);
    HttpUtil.setContentLength(response, contentLength);
    return response;
}
 
源代码29 项目: RHub   文件: AkkaHubProxy.java
@Override
public Removable addUpstream(Source<Object, NotUsed> publisher) {
    UniqueKillSwitch killSwitch =
            publisher.viaMat(busFlow, Keep.right())
                    .to(Sink.ignore())
                    .run(mat);
    subscriptions.put(publisher, killSwitch);
    return () -> AkkaHubProxy.this.removeUpstream(publisher);
}
 
源代码30 项目: ditto   文件: MongoTimestampPersistenceIT.java
@Test
public void ensureCollectionIsCapped() throws Exception {
    final MongoCollection<Document> collection =
            syncPersistence.getCollection().runWith(Sink.head(), materializer).toCompletableFuture().get();

    runBlocking(syncPersistence.setTimestamp(Instant.now()));
    runBlocking(syncPersistence.setTimestamp(Instant.now()));

    assertThat(runBlocking(Source.fromPublisher(collection.count()))).containsExactly(1L);
}