com.mongodb.client.model.BulkWriteOptions#akka.stream.ActorMaterializer源码实例Demo

下面列出了com.mongodb.client.model.BulkWriteOptions#akka.stream.ActorMaterializer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: java-specialagent   文件: PlayWSTest.java
@Test
public void test(final MockTracer tracer) throws Exception {
  final Materializer materializer = ActorMaterializer.create(system);
  final AsyncHttpClientConfig asyncHttpClientConfig = new DefaultAsyncHttpClientConfig.Builder()
    .setMaxRequestRetry(0)
    .setShutdownQuietPeriod(0)
    .setShutdownTimeout(0)
    .build();

  final AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(asyncHttpClientConfig);
  try (final StandaloneAhcWSClient wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer)) {
    wsClient.url("http://localhost:1234").get().toCompletableFuture().get(15, TimeUnit.SECONDS);
  }
  catch (final Exception ignore) {
  }

  await().atMost(15, TimeUnit.SECONDS).until(TestUtil.reportedSpansSize(tracer), equalTo(1));

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());
  assertEquals(PlayWSAgentIntercept.COMPONENT_NAME, spans.get(0).tags().get(Tags.COMPONENT.getKey()));
}
 
源代码2 项目: ditto   文件: HttpPushClientActorTest.java
@Before
public void createActorSystem() {
    // create actor system with deactivated hostname blacklist to connect to localhost
    actorSystem = ActorSystem.create(getClass().getSimpleName(),
            TestConstants.CONFIG.withValue("ditto.connectivity.connection.http-push.blacklisted-hostnames",
                    ConfigValueFactory.fromAnyRef("")));
    mat = ActorMaterializer.create(actorSystem);
    requestQueue = new LinkedBlockingQueue<>();
    responseQueue = new LinkedBlockingQueue<>();
    handler = Flow.fromFunction(request -> {
        requestQueue.offer(request);
        return responseQueue.take();
    });
    binding = Http.get(actorSystem)
            .bindAndHandle(handler, ConnectHttp.toHost("127.0.0.1", 0), mat)
            .toCompletableFuture()
            .join();
    connection = getHttpConnectionBuilderToLocalBinding(false, binding.localAddress().getPort()).build();
}
 
源代码3 项目: java-specialagent   文件: PlayWSITest.java
public static void main(final String[] args) throws Exception {
  final ActorSystem system = ActorSystem.create();
  final Materializer materializer = ActorMaterializer.create(system);

  final AsyncHttpClientConfig asyncHttpClientConfig = new DefaultAsyncHttpClientConfig.Builder()
    .setMaxRequestRetry(0)
    .setShutdownQuietPeriod(0)
    .setShutdownTimeout(0)
    .build();

  final AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(asyncHttpClientConfig);
  try (final StandaloneAhcWSClient wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer)) {
    final int status = wsClient.url("http://www.google.com").get()
      .whenComplete((response, throwable) -> TestUtil.checkActiveSpan())
      .toCompletableFuture().get(15, TimeUnit.SECONDS)
      .getStatus();

    if (200 != status)
      throw new AssertionError("Response: " + status);
  }
  finally {
    system.terminate();
    TestUtil.checkSpan(new ComponentSpanCount("play-ws", 1));
  }
}
 
源代码4 项目: ari-proxy   文件: HealthService.java
private CompletionStage<ServerBinding> startHttpServer() {
	final ActorSystem system = getContext().getSystem();
	final Http http = Http.get(system);
	final ActorMaterializer materializer = ActorMaterializer.create(system);

	final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = createRoute().flow(system, materializer);

	final String address = "0.0.0.0";
	final CompletionStage<ServerBinding> binding = http.bindAndHandle(
			routeFlow,
			ConnectHttp.toHost(address, httpPort),
			materializer
	);

	log().info("HTTP server online at http://{}:{}/...", address, httpPort);

	return binding;
}
 
源代码5 项目: ditto-examples   文件: Kata3SearchAkkaStream.java
@Test
public void part1CreateAkkaSearchQuery() {
    final ActorSystem system = ActorSystem.create("thing-search");
    try {

        final String filter = "or(eq(attributes/counter,1), eq(attributes/counter,2))";


        // TODO create Akka source of publisher with above filter
        final Source<List<Thing>, NotUsed> things = null;


        // Verify Results
        things.flatMapConcat(Source::from)
                .toMat(Sink.seq(), Keep.right())
                .run(ActorMaterializer.create(system))
                .thenAccept(t -> Assertions.assertThat(t).containsAnyOf(thing1, thing2).doesNotContain(thing3))
                .toCompletableFuture()
                .join();
    } finally {
        system.terminate();
    }
}
 
源代码6 项目: ditto   文件: ConciergeRootActor.java
@SuppressWarnings("unused")
private <C extends ConciergeConfig> ConciergeRootActor(final C conciergeConfig,
        final ActorRef pubSubMediator,
        final EnforcerActorFactory<C> enforcerActorFactory,
        final ActorMaterializer materializer) {

    pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());

    final ActorContext context = getContext();
    final ShardRegions shardRegions = ShardRegions.of(getContext().getSystem(), conciergeConfig.getClusterConfig());

    enforcerActorFactory.startEnforcerActor(context, conciergeConfig, pubSubMediator, shardRegions);

    final ActorRef conciergeForwarder = context.findChild(ConciergeForwarderActor.ACTOR_NAME).orElseThrow(() ->
            new IllegalStateException("ConciergeForwarder could not be found"));

    final ActorRef cleanupCoordinator = startClusterSingletonActor(
            EventSnapshotCleanupCoordinator.ACTOR_NAME,
            EventSnapshotCleanupCoordinator.props(conciergeConfig.getPersistenceCleanupConfig(), pubSubMediator,
                    shardRegions));

    final ActorRef healthCheckingActor = startHealthCheckingActor(conciergeConfig, cleanupCoordinator);

    bindHttpStatusRoute(healthCheckingActor, conciergeConfig.getHttpConfig(), materializer);
}
 
源代码7 项目: ditto   文件: ConciergeRootActor.java
private void bindHttpStatusRoute(final ActorRef healthCheckingActor, final HttpConfig httpConfig,
        final ActorMaterializer materializer) {

    String hostname = httpConfig.getHostname();
    if (hostname.isEmpty()) {
        hostname = LocalHostAddressSupplier.getInstance().get();
        log.info("No explicit hostname configured, using HTTP hostname: {}", hostname);
    }

    final CompletionStage<ServerBinding> binding = Http.get(getContext().system())
            .bindAndHandle(createRoute(getContext().system(), healthCheckingActor).flow(getContext().system(),
                    materializer), ConnectHttp.toHost(hostname, httpConfig.getPort()), materializer);

    binding.thenAccept(theBinding -> CoordinatedShutdown.get(getContext().getSystem()).addTask(
            CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_health_http_endpoint", () -> {
                log.info("Gracefully shutting down status/health HTTP endpoint..");
                return theBinding.terminate(Duration.ofSeconds(1))
                        .handle((httpTerminated, e) -> Done.getInstance());
            })
    ).exceptionally(failure -> {
        log.error(failure, "Something very bad happened: {}", failure.getMessage());
        getContext().system().terminate();
        return null;
    });
}
 
源代码8 项目: ditto   文件: MongoHealthChecker.java
private MongoHealthChecker() {

        final DefaultMongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of(
                DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
        mongoClient = MongoClientWrapper.getBuilder(mongoDbConfig)
                .connectionPoolMaxSize(HEALTH_CHECK_MAX_POOL_SIZE)
                .build();

        /*
         * It's important to have the read preferences to primary preferred because the replication is to slow to retrieve
         * the inserted document from a secondary directly after inserting it on the primary.
         */
        collection = mongoClient.getCollection(TEST_COLLECTION_NAME)
                .withReadPreference(ReadPreference.primaryPreferred());

        materializer = ActorMaterializer.create(getContext());
    }
 
源代码9 项目: ditto   文件: MongoReadJournal.java
private <T> Source<List<T>, NotUsed> unfoldBatchedSource(
        final String lowerBound,
        final ActorMaterializer mat,
        final Function<T, String> seedCreator,
        final Function<String, Source<T, ?>> sourceCreator) {

    return Source.unfoldAsync("",
            start -> {
                final String actualStart = lowerBound.compareTo(start) >= 0 ? lowerBound : start;
                return sourceCreator.apply(actualStart)
                        .runWith(Sink.seq(), mat)
                        .thenApply(list -> {
                            if (list.isEmpty()) {
                                return Optional.empty();
                            } else {
                                return Optional.of(Pair.create(seedCreator.apply(list.get(list.size() - 1)), list));
                            }
                        });
            })
            .withAttributes(Attributes.inputBuffer(1, 1));
}
 
源代码10 项目: ditto   文件: AbstractPersistenceOperationsActor.java
private AbstractPersistenceOperationsActor(final ActorRef pubSubMediator,
        final EntityType entityType,
        @Nullable final NamespacePersistenceOperations namespaceOps,
        @Nullable final EntityPersistenceOperations entitiesOps,
        final PersistenceOperationsConfig persistenceOperationsConfig,
        final Collection<Closeable> toCloseWhenStopped) {

    this.pubSubMediator = checkNotNull(pubSubMediator, "pub-sub mediator");
    this.entityType = checkNotNull(entityType, "entityType");
    if (namespaceOps == null && entitiesOps == null) {
        throw new IllegalArgumentException("At least one of namespaceOps or entitiesOps must be specified.");
    }
    this.namespaceOps = namespaceOps;
    this.entitiesOps = entitiesOps;
    this.toCloseWhenStopped = Collections.unmodifiableCollection(toCloseWhenStopped);
    materializer = ActorMaterializer.create(getContext());
    delayAfterPersistenceActorShutdown = persistenceOperationsConfig.getDelayAfterPersistenceActorShutdown();
    logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
}
 
源代码11 项目: ditto   文件: IndexInitializerIT.java
@Before
public void before() {
    system = ActorSystem.create("AkkaTestSystem");
    materializer = ActorMaterializer.create(system);

    requireNonNull(mongoResource);
    requireNonNull(materializer);

    mongoClient = MongoClientWrapper.getBuilder()
            .hostnameAndPort(mongoResource.getBindIp(), mongoResource.getPort())
            .defaultDatabaseName(getClass().getSimpleName() + "-" + UUID.randomUUID().toString())
            .connectionPoolMaxSize(CONNECTION_POOL_MAX_SIZE)
            .connectionPoolMaxWaitQueueSize(CONNECTION_POOL_MAX_WAIT_QUEUE_SIZE)
            .connectionPoolMaxWaitTime(Duration.ofSeconds(CONNECTION_POOL_MAX_WAIT_TIME_SECS))
            .build();

    indexInitializerUnderTest = IndexInitializer.of(mongoClient.getDefaultDatabase(), materializer);
    indexOperations = IndexOperations.of(mongoClient.getDefaultDatabase());
}
 
源代码12 项目: ditto   文件: AbstractGraphActor.java
private SourceQueueWithComplete<T> getSourceQueue(final ActorMaterializer materializer) {
    // Log stream completion and failure at level ERROR because the stream is supposed to survive forever.
    final Attributes streamLogLevels =
            Attributes.logLevels(Attributes.logLevelDebug(), Attributes.logLevelError(),
                    Attributes.logLevelError());

    return Source.<T>queue(getBufferSize(), OverflowStrategy.dropNew())
            .map(this::incrementDequeueCounter)
            .log("graph-actor-stream-1-dequeued", logger)
            .withAttributes(streamLogLevels)
            .via(Flow.fromFunction(this::beforeProcessMessage))
            .log("graph-actor-stream-2-preprocessed", logger)
            .withAttributes(streamLogLevels)
            .via(processMessageFlow())
            .log("graph-actor-stream-3-processed", logger)
            .withAttributes(streamLogLevels)
            .to(processedMessageSink())
            .run(materializer);
}
 
源代码13 项目: ditto   文件: HttpPublisherActor.java
@SuppressWarnings("unused")
private HttpPublisherActor(final Connection connection, final HttpPushFactory factory) {
    super(connection);
    this.factory = factory;

    final ActorSystem system = getContext().getSystem();
    final ConnectionConfig connectionConfig =
            DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(system.settings().config()))
                    .getConnectionConfig();
    config = connectionConfig.getHttpPushConfig();

    materializer = ActorMaterializer.create(getContext());
    sourceQueue =
            Source.<Pair<HttpRequest, HttpPushContext>>queue(config.getMaxQueueSize(), OverflowStrategy.dropNew())
                    .viaMat(factory.createFlow(system, log), Keep.left())
                    .toMat(Sink.foreach(this::processResponse), Keep.left())
                    .run(materializer);
}
 
源代码14 项目: ditto   文件: StreamingActor.java
@SuppressWarnings("unused")
private StreamingActor(final DittoProtocolSub dittoProtocolSub,
        final ActorRef commandRouter,
        final JwtAuthenticationFactory jwtAuthenticationFactory,
        final StreamingConfig streamingConfig,
        final HeaderTranslator headerTranslator,
        final ActorRef pubSubMediator,
        final ActorRef conciergeForwarder) {

    this.dittoProtocolSub = dittoProtocolSub;
    this.commandRouter = commandRouter;
    this.streamingConfig = streamingConfig;
    this.headerTranslator = headerTranslator;
    streamingSessionsCounter = DittoMetrics.gauge("streaming_sessions_count");
    jwtValidator = jwtAuthenticationFactory.getJwtValidator();
    jwtAuthenticationResultProvider = jwtAuthenticationFactory.newJwtAuthenticationResultProvider();
    subscriptionManagerProps =
            SubscriptionManager.props(streamingConfig.getSearchIdleTimeout(), pubSubMediator, conciergeForwarder,
                    ActorMaterializer.create(getContext()));
    scheduleScrapeStreamSessionsCounter();
}
 
源代码15 项目: ditto   文件: ThingsSearchCursor.java
/**
 * Extract a cursor from a {@code QueryThings} command if any exists.
 *
 * @param queryThings the command.
 * @param materializer materializer of actors that will extract the cursor.
 * @return source of an optional cursor if the command has no cursor or has a valid cursor; a failed source if the
 * command has an invalid cursor.
 */
static Source<Optional<ThingsSearchCursor>, NotUsed> extractCursor(final QueryThings queryThings,
        final ActorMaterializer materializer) {

    return catchExceptions(queryThings.getDittoHeaders(), () -> {
        final List<Option> options = getOptions(queryThings);
        final List<CursorOption> cursorOptions = findAll(CursorOption.class, options);
        final List<LimitOption> limitOptions = findAll(LimitOption.class, options);
        final Optional<InvalidOptionException> sizeOptionError = checkSizeOption(options, queryThings);
        if (sizeOptionError.isPresent()) {
            return Source.failed(sizeOptionError.get());
        } else if (cursorOptions.isEmpty()) {
            return Source.single(Optional.empty());
        } else if (cursorOptions.size() > 1) {
            // there may not be 2 or more cursor options in 1 command.
            return Source.failed(invalidCursor("There may not be more than 1 'cursor' option.", queryThings));
        } else if (!limitOptions.isEmpty()) {
            return Source.failed(invalidCursor(LIMIT_OPTION_FORBIDDEN, queryThings));
        } else {
            return decode(cursorOptions.get(0).getCursor(), materializer)
                    .flatMapConcat(cursor -> cursor.checkCursorValidity(queryThings, options)
                            .<Source<Optional<ThingsSearchCursor>, NotUsed>>map(Source::failed)
                            .orElse(Source.single(Optional.of(cursor))));
        }
    });
}
 
源代码16 项目: java-specialagent   文件: AkkaHttpServerITest.java
public static void main(final String[] args) throws Exception {
  final ActorSystem system = ActorSystem.create();
  final Materializer materializer = ActorMaterializer.create(system);
  final Http http = getHttp(system);

  testSync(http, materializer);
  testAsync(http, materializer);

  Await.result(system.terminate(), Duration.create(15, TimeUnit.SECONDS));
}
 
private static ActorMaterializer run(ActorSystem system, ActorRef callContextProvider, ActorRef metricsService,
		Source<Message, NotUsed> source, Sink<ProducerRecord<String, String>, NotUsed> sink,
		Runnable applicationReplacedHandler) {
	final Function<Throwable, Supervision.Directive> supervisorStrategy = t -> {
		system.log().error(t, t.getMessage());
		return Supervision.resume();
	};

	final Config kafkaConfig = ConfigFactory.load().getConfig(SERVICE).getConfig(KAFKA);
	final String commandsTopic = kafkaConfig.getString(COMMANDS_TOPIC);
	final String eventsAndResponsesTopic = kafkaConfig.getString(EVENTS_AND_RESPONSES_TOPIC);

	final ActorMaterializer materializer = ActorMaterializer.create(
			ActorMaterializerSettings.create(system).withSupervisionStrategy(supervisorStrategy),
			system);

	source
			//.throttle(4 * 13, Duration.ofSeconds(1)) // Note: We die right now for calls/s >= 4.8
			.wireTap(Sink.foreach(msg -> gatherMetrics(msg, metricsService, callContextProvider)))
			.flatMapConcat((msg) -> generateProducerRecordFromEvent(commandsTopic, eventsAndResponsesTopic, msg, callContextProvider, system.log(),
					applicationReplacedHandler))
			.log(">>>   ARI EVENT", record -> record.value()).withAttributes(LOG_LEVELS)
			.to(sink)
			.run(materializer);

	return materializer;
}
 
源代码18 项目: ari-proxy   文件: Main.java
private static ActorMaterializer runAriCommandResponseProcessor(
		Config kafkaConfig,
		ActorSystem system,
		ActorRef callContextProvider,
		ActorRef metricsService) {
	final ConsumerSettings<String, String> consumerSettings = ConsumerSettings
			.create(system, new StringDeserializer(), new StringDeserializer())
			.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS))
			.withGroupId(kafkaConfig.getString(CONSUMER_GROUP))
			.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
			.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

	final ProducerSettings<String, String> producerSettings = ProducerSettings
			.create(system, new StringSerializer(), new StringSerializer())
			.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS));

	final Source<ConsumerRecord<String, String>, NotUsed> source = RestartSource.withBackoff(
			Duration.of(5, ChronoUnit.SECONDS),
			Duration.of(10, ChronoUnit.SECONDS),
			0.2,
			() -> Consumer
					.plainSource(consumerSettings, Subscriptions.topics(kafkaConfig.getString(COMMANDS_TOPIC)))
					.mapMaterializedValue(control -> NotUsed.getInstance())
	);

	final Sink<ProducerRecord<String, String>, NotUsed> sink = Producer
			.plainSink(producerSettings)
			.mapMaterializedValue(done -> NotUsed.getInstance());

	return AriCommandResponseKafkaProcessor.commandResponseProcessing()
			.on(system)
			.withHandler(requestAndContext -> Http.get(system).singleRequest(requestAndContext._1))
			.withCallContextProvider(callContextProvider)
			.withMetricsService(metricsService)
			.from(source)
			.to(sink)
			.run();
}
 
源代码19 项目: ari-proxy   文件: AriEventProcessingTest.java
@DisplayName("Verify processing of both channel and playback events results in the expected kafka producer record")
@ParameterizedTest
@ValueSource(strings = { stasisStartEvent, playbackFinishedEvent, recordingFinishedEvent })
void generateProducerRecordFromAllAriMessageTypes(String ariEvent) {
	new TestKit(system)
	{
		{
			final Future<Source<ProducerRecord<String, String>, NotUsed>> wsToKafkaProcessor = Future.of(
					() -> AriEventProcessing
							.generateProducerRecordFromEvent(fakeCommandsTopic, fakeEventsAndResponsesTopic, new Strict(ariEvent), getRef(), system.log(), genApplicationReplacedHandler.apply(getRef()))
			);

			expectMsgClass(ProvideCallContext.class);
			reply(new CallContextProvided("CALL_CONTEXT"));

			final ProducerRecord<String, String> record = wsToKafkaProcessor
					.flatMap(source -> Future.fromCompletableFuture(source.runWith(
							Sink.last(),
							ActorMaterializer.create(ActorMaterializerSettings.create(system), system))
							.toCompletableFuture())
					)
					.await()
					.get();

			assertThat(record.key(), is("CALL_CONTEXT"));
			assertThat(record.topic(), is(fakeEventsAndResponsesTopic));
		}
	};
}
 
源代码20 项目: reactive-code-workshop   文件: AkkaApplication.java
private void dumpSourceToStdOut(Source<?,NotUsed> src) throws InterruptedException, ExecutionException {
    final ActorSystem system = ActorSystem.create("QuickStart");
    final Materializer materializer = ActorMaterializer.create(system);

    final CompletionStage<Done> done = src.runWith(Sink.foreach(a -> System.out.println(a)),materializer);
    done.thenRun(()->system.terminate());

    // Make it happen
    done.toCompletableFuture().get();
}
 
private String dumpSourceToString(Source<?,NotUsed> f) throws InterruptedException, ExecutionException {
    final ActorSystem system = ActorSystem.create("QuickStart");
    final Materializer materializer = ActorMaterializer.create(system);

    StringBuilder s = new StringBuilder();
    final CompletionStage<Done> done = f.runWith(Sink.foreach(a -> s.append(a)),materializer);

    done.thenRun(()->system.terminate());
    done.toCompletableFuture().get();

    return s.toString();
}
 
源代码22 项目: reactive-streams-in-java   文件: Application.java
@Lazy(false)
@Bean
MessageConsumer createAkkaMessageConsumer(Flux<String> messageFlux, ActorSystem actorSystem) {
    MessageConsumer messageConsumer = new MessageConsumer("Akka");
    // using Akka Streams to consume messages:
    ActorMaterializer mat = ActorMaterializer.create(actorSystem);

    Source.fromPublisher(messageFlux)
            .buffer(100, OverflowStrategy.backpressure()) // 100 max buffer
            .to(Sink.foreach(msg -> messageConsumer.accept(msg)))
            .run(mat);

    return messageConsumer;
}
 
源代码23 项目: ditto   文件: HttpPublisherActor.java
private static CompletionStage<String> getResponseBody(final HttpResponse response,
        final ActorMaterializer materializer) {
    return response.entity()
            .toStrict(READ_BODY_TIMEOUT_MS, materializer)
            .thenApply(HttpEntity.Strict::getData)
            .thenApply(ByteString::utf8String);
}
 
源代码24 项目: reactive-stock-trader   文件: StreamIT.java
@BeforeClass
public static void setup() {
    clientFactory = LagomClientFactory.create("integration-test", StreamIT.class.getClassLoader());
    // One of the clients can use the service locator, the other can use the service gateway, to test them both.

    system = ActorSystem.create();
    mat = ActorMaterializer.create(system);
}
 
源代码25 项目: servicecomb-pack   文件: KafkaSagaEventConsumer.java
public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
    MetricsService metricsService, String bootstrap_servers, String topic) {
  super(actorSystem, sagaShardRegionActor, metricsService);


  // init consumer
  final Materializer materializer = ActorMaterializer.create(actorSystem);
  final Config consumerConfig = actorSystem.settings().config().getConfig("akka.kafka.consumer");
  final ConsumerSettings<String, String> consumerSettings =
      ConsumerSettings
          .create(consumerConfig, new StringDeserializer(), new StringDeserializer())
          .withBootstrapServers(bootstrap_servers)
          .withGroupId(groupId)
          .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
          .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
          .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class")
          .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class");
  Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(20, event -> {
        BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class);
        if (LOG.isDebugEnabled()) {
          LOG.debug("receive [{}] {} {}", bean.getGlobalTxId(), bean.getType(), bean.getLocalTxId());
        }
        return sendSagaActor(bean).thenApply(done -> event.committableOffset());
      })
      .batch(
          100,
          ConsumerMessage::createCommittableOffsetBatch,
          ConsumerMessage.CommittableOffsetBatch::updated
      )
      .mapAsync(20, offset -> offset.commitJavadsl())
      .to(Sink.ignore())
      .run(materializer);
}
 
public static void main(String[] args) throws IOException {
  ActorSystem actorSystem = ActorSystem.create(CLUSTER_NAME);
  actorSystem.actorOf(SimpleClusterListener.props());
  final ActorMaterializer materializer = ActorMaterializer.create(actorSystem);

  Cluster cluster = Cluster.get(actorSystem);
  List<Address> addresses = Arrays.asList(System.getenv().get("SEED_NODES").split(","))
      .stream()
      .map(ip -> new Address("akka.tcp", CLUSTER_NAME, ip, 2551))
      .collect(Collectors.toList());
  cluster.joinSeedNodes(addresses);
}
 
源代码27 项目: ditto   文件: DittoService.java
/**
 * Starts Prometheus HTTP endpoint on which Prometheus may scrape the data.
 */
private void startKamonPrometheusHttpEndpoint(final ActorSystem actorSystem) {
    final MetricsConfig metricsConfig = serviceSpecificConfig.getMetricsConfig();
    if (metricsConfig.isPrometheusEnabled() && null != prometheusReporter) {
        final String prometheusHostname = metricsConfig.getPrometheusHostname();
        final int prometheusPort = metricsConfig.getPrometheusPort();
        final ActorMaterializer materializer = createActorMaterializer(actorSystem);
        final Route prometheusReporterRoute = PrometheusReporterRoute
                .buildPrometheusReporterRoute(prometheusReporter);
        final CompletionStage<ServerBinding> binding = Http.get(actorSystem)
                .bindAndHandle(prometheusReporterRoute.flow(actorSystem, materializer),
                        ConnectHttp.toHost(prometheusHostname, prometheusPort), materializer);

        binding.thenAccept(theBinding -> CoordinatedShutdown.get(actorSystem).addTask(
                CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_prometheus_http_endpoint", () -> {
                    logger.info("Gracefully shutting down Prometheus HTTP endpoint ...");
                    // prometheus requests don't get the luxury of being processed a long time after shutdown:
                    return theBinding.terminate(Duration.ofSeconds(1))
                            .handle((httpTerminated, e) -> Done.getInstance());
                })
        ).exceptionally(failure -> {
            logger.error("Kamon Prometheus HTTP endpoint could not be started: {}", failure.getMessage(), failure);
            logger.error("Terminating ActorSystem!");
            actorSystem.terminate();
            return null;
        });
    }
}
 
源代码28 项目: ditto   文件: ConciergeRootActor.java
/**
 * Creates Akka configuration object Props for this actor.
 *
 * @param conciergeConfig the config of Concierge.
 * @param pubSubMediator the PubSub mediator Actor.
 * @param enforcerActorFactory factory for creating sharded enforcer actors.
 * @param materializer the materializer for the Akka actor system.
 * @return the Akka configuration Props object.
 * @throws NullPointerException if any argument is {@code null}.
 */
public static <C extends ConciergeConfig> Props props(final C conciergeConfig,
        final ActorRef pubSubMediator,
        final EnforcerActorFactory<C> enforcerActorFactory,
        final ActorMaterializer materializer) {

    checkNotNull(conciergeConfig, "config of Concierge");
    checkNotNull(pubSubMediator, "pub-sub mediator");
    checkNotNull(enforcerActorFactory, "EnforcerActor factory");
    checkNotNull(materializer, "ActorMaterializer");

    return Props.create(ConciergeRootActor.class, conciergeConfig, pubSubMediator, enforcerActorFactory,
            materializer);
}
 
源代码29 项目: ditto   文件: SearchActorIT.java
@Before
public void before() {
    actorSystem = ActorSystem.create(getClass().getSimpleName(),
            ConfigFactory.parseString("search-dispatcher {\n" +
                    "  type = PinnedDispatcher\n" +
                    "  executor = \"thread-pool-executor\"\n" +
                    "}"));
    materializer = ActorMaterializer.create(actorSystem);
    readPersistence = provideReadPersistence();
    writePersistence = provideWritePersistence();
    thingsCollection = mongoClient.getDefaultDatabase().getCollection(PersistenceConstants.THINGS_COLLECTION_NAME);
}
 
源代码30 项目: ditto   文件: ConciergeService.java
@Override
protected Props getMainRootActorProps(final ConciergeConfig serviceSpecificConfig, final ActorRef pubSubMediator,
        final ActorMaterializer materializer) {

    return ConciergeRootActor.props(serviceSpecificConfig, pubSubMediator, new DefaultEnforcerActorFactory(),
            materializer);
}