下面列出了com.mongodb.client.model.BulkWriteOptions#akka.stream.ActorMaterializer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void test(final MockTracer tracer) throws Exception {
final Materializer materializer = ActorMaterializer.create(system);
final AsyncHttpClientConfig asyncHttpClientConfig = new DefaultAsyncHttpClientConfig.Builder()
.setMaxRequestRetry(0)
.setShutdownQuietPeriod(0)
.setShutdownTimeout(0)
.build();
final AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(asyncHttpClientConfig);
try (final StandaloneAhcWSClient wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer)) {
wsClient.url("http://localhost:1234").get().toCompletableFuture().get(15, TimeUnit.SECONDS);
}
catch (final Exception ignore) {
}
await().atMost(15, TimeUnit.SECONDS).until(TestUtil.reportedSpansSize(tracer), equalTo(1));
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertEquals(PlayWSAgentIntercept.COMPONENT_NAME, spans.get(0).tags().get(Tags.COMPONENT.getKey()));
}
@Before
public void createActorSystem() {
// create actor system with deactivated hostname blacklist to connect to localhost
actorSystem = ActorSystem.create(getClass().getSimpleName(),
TestConstants.CONFIG.withValue("ditto.connectivity.connection.http-push.blacklisted-hostnames",
ConfigValueFactory.fromAnyRef("")));
mat = ActorMaterializer.create(actorSystem);
requestQueue = new LinkedBlockingQueue<>();
responseQueue = new LinkedBlockingQueue<>();
handler = Flow.fromFunction(request -> {
requestQueue.offer(request);
return responseQueue.take();
});
binding = Http.get(actorSystem)
.bindAndHandle(handler, ConnectHttp.toHost("127.0.0.1", 0), mat)
.toCompletableFuture()
.join();
connection = getHttpConnectionBuilderToLocalBinding(false, binding.localAddress().getPort()).build();
}
public static void main(final String[] args) throws Exception {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final AsyncHttpClientConfig asyncHttpClientConfig = new DefaultAsyncHttpClientConfig.Builder()
.setMaxRequestRetry(0)
.setShutdownQuietPeriod(0)
.setShutdownTimeout(0)
.build();
final AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(asyncHttpClientConfig);
try (final StandaloneAhcWSClient wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer)) {
final int status = wsClient.url("http://www.google.com").get()
.whenComplete((response, throwable) -> TestUtil.checkActiveSpan())
.toCompletableFuture().get(15, TimeUnit.SECONDS)
.getStatus();
if (200 != status)
throw new AssertionError("Response: " + status);
}
finally {
system.terminate();
TestUtil.checkSpan(new ComponentSpanCount("play-ws", 1));
}
}
private CompletionStage<ServerBinding> startHttpServer() {
final ActorSystem system = getContext().getSystem();
final Http http = Http.get(system);
final ActorMaterializer materializer = ActorMaterializer.create(system);
final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = createRoute().flow(system, materializer);
final String address = "0.0.0.0";
final CompletionStage<ServerBinding> binding = http.bindAndHandle(
routeFlow,
ConnectHttp.toHost(address, httpPort),
materializer
);
log().info("HTTP server online at http://{}:{}/...", address, httpPort);
return binding;
}
@Test
public void part1CreateAkkaSearchQuery() {
final ActorSystem system = ActorSystem.create("thing-search");
try {
final String filter = "or(eq(attributes/counter,1), eq(attributes/counter,2))";
// TODO create Akka source of publisher with above filter
final Source<List<Thing>, NotUsed> things = null;
// Verify Results
things.flatMapConcat(Source::from)
.toMat(Sink.seq(), Keep.right())
.run(ActorMaterializer.create(system))
.thenAccept(t -> Assertions.assertThat(t).containsAnyOf(thing1, thing2).doesNotContain(thing3))
.toCompletableFuture()
.join();
} finally {
system.terminate();
}
}
@SuppressWarnings("unused")
private <C extends ConciergeConfig> ConciergeRootActor(final C conciergeConfig,
final ActorRef pubSubMediator,
final EnforcerActorFactory<C> enforcerActorFactory,
final ActorMaterializer materializer) {
pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());
final ActorContext context = getContext();
final ShardRegions shardRegions = ShardRegions.of(getContext().getSystem(), conciergeConfig.getClusterConfig());
enforcerActorFactory.startEnforcerActor(context, conciergeConfig, pubSubMediator, shardRegions);
final ActorRef conciergeForwarder = context.findChild(ConciergeForwarderActor.ACTOR_NAME).orElseThrow(() ->
new IllegalStateException("ConciergeForwarder could not be found"));
final ActorRef cleanupCoordinator = startClusterSingletonActor(
EventSnapshotCleanupCoordinator.ACTOR_NAME,
EventSnapshotCleanupCoordinator.props(conciergeConfig.getPersistenceCleanupConfig(), pubSubMediator,
shardRegions));
final ActorRef healthCheckingActor = startHealthCheckingActor(conciergeConfig, cleanupCoordinator);
bindHttpStatusRoute(healthCheckingActor, conciergeConfig.getHttpConfig(), materializer);
}
private void bindHttpStatusRoute(final ActorRef healthCheckingActor, final HttpConfig httpConfig,
final ActorMaterializer materializer) {
String hostname = httpConfig.getHostname();
if (hostname.isEmpty()) {
hostname = LocalHostAddressSupplier.getInstance().get();
log.info("No explicit hostname configured, using HTTP hostname: {}", hostname);
}
final CompletionStage<ServerBinding> binding = Http.get(getContext().system())
.bindAndHandle(createRoute(getContext().system(), healthCheckingActor).flow(getContext().system(),
materializer), ConnectHttp.toHost(hostname, httpConfig.getPort()), materializer);
binding.thenAccept(theBinding -> CoordinatedShutdown.get(getContext().getSystem()).addTask(
CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_health_http_endpoint", () -> {
log.info("Gracefully shutting down status/health HTTP endpoint..");
return theBinding.terminate(Duration.ofSeconds(1))
.handle((httpTerminated, e) -> Done.getInstance());
})
).exceptionally(failure -> {
log.error(failure, "Something very bad happened: {}", failure.getMessage());
getContext().system().terminate();
return null;
});
}
private MongoHealthChecker() {
final DefaultMongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
mongoClient = MongoClientWrapper.getBuilder(mongoDbConfig)
.connectionPoolMaxSize(HEALTH_CHECK_MAX_POOL_SIZE)
.build();
/*
* It's important to have the read preferences to primary preferred because the replication is to slow to retrieve
* the inserted document from a secondary directly after inserting it on the primary.
*/
collection = mongoClient.getCollection(TEST_COLLECTION_NAME)
.withReadPreference(ReadPreference.primaryPreferred());
materializer = ActorMaterializer.create(getContext());
}
private <T> Source<List<T>, NotUsed> unfoldBatchedSource(
final String lowerBound,
final ActorMaterializer mat,
final Function<T, String> seedCreator,
final Function<String, Source<T, ?>> sourceCreator) {
return Source.unfoldAsync("",
start -> {
final String actualStart = lowerBound.compareTo(start) >= 0 ? lowerBound : start;
return sourceCreator.apply(actualStart)
.runWith(Sink.seq(), mat)
.thenApply(list -> {
if (list.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(Pair.create(seedCreator.apply(list.get(list.size() - 1)), list));
}
});
})
.withAttributes(Attributes.inputBuffer(1, 1));
}
private AbstractPersistenceOperationsActor(final ActorRef pubSubMediator,
final EntityType entityType,
@Nullable final NamespacePersistenceOperations namespaceOps,
@Nullable final EntityPersistenceOperations entitiesOps,
final PersistenceOperationsConfig persistenceOperationsConfig,
final Collection<Closeable> toCloseWhenStopped) {
this.pubSubMediator = checkNotNull(pubSubMediator, "pub-sub mediator");
this.entityType = checkNotNull(entityType, "entityType");
if (namespaceOps == null && entitiesOps == null) {
throw new IllegalArgumentException("At least one of namespaceOps or entitiesOps must be specified.");
}
this.namespaceOps = namespaceOps;
this.entitiesOps = entitiesOps;
this.toCloseWhenStopped = Collections.unmodifiableCollection(toCloseWhenStopped);
materializer = ActorMaterializer.create(getContext());
delayAfterPersistenceActorShutdown = persistenceOperationsConfig.getDelayAfterPersistenceActorShutdown();
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
}
@Before
public void before() {
system = ActorSystem.create("AkkaTestSystem");
materializer = ActorMaterializer.create(system);
requireNonNull(mongoResource);
requireNonNull(materializer);
mongoClient = MongoClientWrapper.getBuilder()
.hostnameAndPort(mongoResource.getBindIp(), mongoResource.getPort())
.defaultDatabaseName(getClass().getSimpleName() + "-" + UUID.randomUUID().toString())
.connectionPoolMaxSize(CONNECTION_POOL_MAX_SIZE)
.connectionPoolMaxWaitQueueSize(CONNECTION_POOL_MAX_WAIT_QUEUE_SIZE)
.connectionPoolMaxWaitTime(Duration.ofSeconds(CONNECTION_POOL_MAX_WAIT_TIME_SECS))
.build();
indexInitializerUnderTest = IndexInitializer.of(mongoClient.getDefaultDatabase(), materializer);
indexOperations = IndexOperations.of(mongoClient.getDefaultDatabase());
}
private SourceQueueWithComplete<T> getSourceQueue(final ActorMaterializer materializer) {
// Log stream completion and failure at level ERROR because the stream is supposed to survive forever.
final Attributes streamLogLevels =
Attributes.logLevels(Attributes.logLevelDebug(), Attributes.logLevelError(),
Attributes.logLevelError());
return Source.<T>queue(getBufferSize(), OverflowStrategy.dropNew())
.map(this::incrementDequeueCounter)
.log("graph-actor-stream-1-dequeued", logger)
.withAttributes(streamLogLevels)
.via(Flow.fromFunction(this::beforeProcessMessage))
.log("graph-actor-stream-2-preprocessed", logger)
.withAttributes(streamLogLevels)
.via(processMessageFlow())
.log("graph-actor-stream-3-processed", logger)
.withAttributes(streamLogLevels)
.to(processedMessageSink())
.run(materializer);
}
@SuppressWarnings("unused")
private HttpPublisherActor(final Connection connection, final HttpPushFactory factory) {
super(connection);
this.factory = factory;
final ActorSystem system = getContext().getSystem();
final ConnectionConfig connectionConfig =
DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(system.settings().config()))
.getConnectionConfig();
config = connectionConfig.getHttpPushConfig();
materializer = ActorMaterializer.create(getContext());
sourceQueue =
Source.<Pair<HttpRequest, HttpPushContext>>queue(config.getMaxQueueSize(), OverflowStrategy.dropNew())
.viaMat(factory.createFlow(system, log), Keep.left())
.toMat(Sink.foreach(this::processResponse), Keep.left())
.run(materializer);
}
@SuppressWarnings("unused")
private StreamingActor(final DittoProtocolSub dittoProtocolSub,
final ActorRef commandRouter,
final JwtAuthenticationFactory jwtAuthenticationFactory,
final StreamingConfig streamingConfig,
final HeaderTranslator headerTranslator,
final ActorRef pubSubMediator,
final ActorRef conciergeForwarder) {
this.dittoProtocolSub = dittoProtocolSub;
this.commandRouter = commandRouter;
this.streamingConfig = streamingConfig;
this.headerTranslator = headerTranslator;
streamingSessionsCounter = DittoMetrics.gauge("streaming_sessions_count");
jwtValidator = jwtAuthenticationFactory.getJwtValidator();
jwtAuthenticationResultProvider = jwtAuthenticationFactory.newJwtAuthenticationResultProvider();
subscriptionManagerProps =
SubscriptionManager.props(streamingConfig.getSearchIdleTimeout(), pubSubMediator, conciergeForwarder,
ActorMaterializer.create(getContext()));
scheduleScrapeStreamSessionsCounter();
}
/**
* Extract a cursor from a {@code QueryThings} command if any exists.
*
* @param queryThings the command.
* @param materializer materializer of actors that will extract the cursor.
* @return source of an optional cursor if the command has no cursor or has a valid cursor; a failed source if the
* command has an invalid cursor.
*/
static Source<Optional<ThingsSearchCursor>, NotUsed> extractCursor(final QueryThings queryThings,
final ActorMaterializer materializer) {
return catchExceptions(queryThings.getDittoHeaders(), () -> {
final List<Option> options = getOptions(queryThings);
final List<CursorOption> cursorOptions = findAll(CursorOption.class, options);
final List<LimitOption> limitOptions = findAll(LimitOption.class, options);
final Optional<InvalidOptionException> sizeOptionError = checkSizeOption(options, queryThings);
if (sizeOptionError.isPresent()) {
return Source.failed(sizeOptionError.get());
} else if (cursorOptions.isEmpty()) {
return Source.single(Optional.empty());
} else if (cursorOptions.size() > 1) {
// there may not be 2 or more cursor options in 1 command.
return Source.failed(invalidCursor("There may not be more than 1 'cursor' option.", queryThings));
} else if (!limitOptions.isEmpty()) {
return Source.failed(invalidCursor(LIMIT_OPTION_FORBIDDEN, queryThings));
} else {
return decode(cursorOptions.get(0).getCursor(), materializer)
.flatMapConcat(cursor -> cursor.checkCursorValidity(queryThings, options)
.<Source<Optional<ThingsSearchCursor>, NotUsed>>map(Source::failed)
.orElse(Source.single(Optional.of(cursor))));
}
});
}
public static void main(final String[] args) throws Exception {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Http http = getHttp(system);
testSync(http, materializer);
testAsync(http, materializer);
Await.result(system.terminate(), Duration.create(15, TimeUnit.SECONDS));
}
private static ActorMaterializer run(ActorSystem system, ActorRef callContextProvider, ActorRef metricsService,
Source<Message, NotUsed> source, Sink<ProducerRecord<String, String>, NotUsed> sink,
Runnable applicationReplacedHandler) {
final Function<Throwable, Supervision.Directive> supervisorStrategy = t -> {
system.log().error(t, t.getMessage());
return Supervision.resume();
};
final Config kafkaConfig = ConfigFactory.load().getConfig(SERVICE).getConfig(KAFKA);
final String commandsTopic = kafkaConfig.getString(COMMANDS_TOPIC);
final String eventsAndResponsesTopic = kafkaConfig.getString(EVENTS_AND_RESPONSES_TOPIC);
final ActorMaterializer materializer = ActorMaterializer.create(
ActorMaterializerSettings.create(system).withSupervisionStrategy(supervisorStrategy),
system);
source
//.throttle(4 * 13, Duration.ofSeconds(1)) // Note: We die right now for calls/s >= 4.8
.wireTap(Sink.foreach(msg -> gatherMetrics(msg, metricsService, callContextProvider)))
.flatMapConcat((msg) -> generateProducerRecordFromEvent(commandsTopic, eventsAndResponsesTopic, msg, callContextProvider, system.log(),
applicationReplacedHandler))
.log(">>> ARI EVENT", record -> record.value()).withAttributes(LOG_LEVELS)
.to(sink)
.run(materializer);
return materializer;
}
private static ActorMaterializer runAriCommandResponseProcessor(
Config kafkaConfig,
ActorSystem system,
ActorRef callContextProvider,
ActorRef metricsService) {
final ConsumerSettings<String, String> consumerSettings = ConsumerSettings
.create(system, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS))
.withGroupId(kafkaConfig.getString(CONSUMER_GROUP))
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final ProducerSettings<String, String> producerSettings = ProducerSettings
.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS));
final Source<ConsumerRecord<String, String>, NotUsed> source = RestartSource.withBackoff(
Duration.of(5, ChronoUnit.SECONDS),
Duration.of(10, ChronoUnit.SECONDS),
0.2,
() -> Consumer
.plainSource(consumerSettings, Subscriptions.topics(kafkaConfig.getString(COMMANDS_TOPIC)))
.mapMaterializedValue(control -> NotUsed.getInstance())
);
final Sink<ProducerRecord<String, String>, NotUsed> sink = Producer
.plainSink(producerSettings)
.mapMaterializedValue(done -> NotUsed.getInstance());
return AriCommandResponseKafkaProcessor.commandResponseProcessing()
.on(system)
.withHandler(requestAndContext -> Http.get(system).singleRequest(requestAndContext._1))
.withCallContextProvider(callContextProvider)
.withMetricsService(metricsService)
.from(source)
.to(sink)
.run();
}
@DisplayName("Verify processing of both channel and playback events results in the expected kafka producer record")
@ParameterizedTest
@ValueSource(strings = { stasisStartEvent, playbackFinishedEvent, recordingFinishedEvent })
void generateProducerRecordFromAllAriMessageTypes(String ariEvent) {
new TestKit(system)
{
{
final Future<Source<ProducerRecord<String, String>, NotUsed>> wsToKafkaProcessor = Future.of(
() -> AriEventProcessing
.generateProducerRecordFromEvent(fakeCommandsTopic, fakeEventsAndResponsesTopic, new Strict(ariEvent), getRef(), system.log(), genApplicationReplacedHandler.apply(getRef()))
);
expectMsgClass(ProvideCallContext.class);
reply(new CallContextProvided("CALL_CONTEXT"));
final ProducerRecord<String, String> record = wsToKafkaProcessor
.flatMap(source -> Future.fromCompletableFuture(source.runWith(
Sink.last(),
ActorMaterializer.create(ActorMaterializerSettings.create(system), system))
.toCompletableFuture())
)
.await()
.get();
assertThat(record.key(), is("CALL_CONTEXT"));
assertThat(record.topic(), is(fakeEventsAndResponsesTopic));
}
};
}
private void dumpSourceToStdOut(Source<?,NotUsed> src) throws InterruptedException, ExecutionException {
final ActorSystem system = ActorSystem.create("QuickStart");
final Materializer materializer = ActorMaterializer.create(system);
final CompletionStage<Done> done = src.runWith(Sink.foreach(a -> System.out.println(a)),materializer);
done.thenRun(()->system.terminate());
// Make it happen
done.toCompletableFuture().get();
}
private String dumpSourceToString(Source<?,NotUsed> f) throws InterruptedException, ExecutionException {
final ActorSystem system = ActorSystem.create("QuickStart");
final Materializer materializer = ActorMaterializer.create(system);
StringBuilder s = new StringBuilder();
final CompletionStage<Done> done = f.runWith(Sink.foreach(a -> s.append(a)),materializer);
done.thenRun(()->system.terminate());
done.toCompletableFuture().get();
return s.toString();
}
@Lazy(false)
@Bean
MessageConsumer createAkkaMessageConsumer(Flux<String> messageFlux, ActorSystem actorSystem) {
MessageConsumer messageConsumer = new MessageConsumer("Akka");
// using Akka Streams to consume messages:
ActorMaterializer mat = ActorMaterializer.create(actorSystem);
Source.fromPublisher(messageFlux)
.buffer(100, OverflowStrategy.backpressure()) // 100 max buffer
.to(Sink.foreach(msg -> messageConsumer.accept(msg)))
.run(mat);
return messageConsumer;
}
private static CompletionStage<String> getResponseBody(final HttpResponse response,
final ActorMaterializer materializer) {
return response.entity()
.toStrict(READ_BODY_TIMEOUT_MS, materializer)
.thenApply(HttpEntity.Strict::getData)
.thenApply(ByteString::utf8String);
}
@BeforeClass
public static void setup() {
clientFactory = LagomClientFactory.create("integration-test", StreamIT.class.getClassLoader());
// One of the clients can use the service locator, the other can use the service gateway, to test them both.
system = ActorSystem.create();
mat = ActorMaterializer.create(system);
}
public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
MetricsService metricsService, String bootstrap_servers, String topic) {
super(actorSystem, sagaShardRegionActor, metricsService);
// init consumer
final Materializer materializer = ActorMaterializer.create(actorSystem);
final Config consumerConfig = actorSystem.settings().config().getConfig("akka.kafka.consumer");
final ConsumerSettings<String, String> consumerSettings =
ConsumerSettings
.create(consumerConfig, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(bootstrap_servers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class")
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class");
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(20, event -> {
BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class);
if (LOG.isDebugEnabled()) {
LOG.debug("receive [{}] {} {}", bean.getGlobalTxId(), bean.getType(), bean.getLocalTxId());
}
return sendSagaActor(bean).thenApply(done -> event.committableOffset());
})
.batch(
100,
ConsumerMessage::createCommittableOffsetBatch,
ConsumerMessage.CommittableOffsetBatch::updated
)
.mapAsync(20, offset -> offset.commitJavadsl())
.to(Sink.ignore())
.run(materializer);
}
public static void main(String[] args) throws IOException {
ActorSystem actorSystem = ActorSystem.create(CLUSTER_NAME);
actorSystem.actorOf(SimpleClusterListener.props());
final ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
Cluster cluster = Cluster.get(actorSystem);
List<Address> addresses = Arrays.asList(System.getenv().get("SEED_NODES").split(","))
.stream()
.map(ip -> new Address("akka.tcp", CLUSTER_NAME, ip, 2551))
.collect(Collectors.toList());
cluster.joinSeedNodes(addresses);
}
/**
* Starts Prometheus HTTP endpoint on which Prometheus may scrape the data.
*/
private void startKamonPrometheusHttpEndpoint(final ActorSystem actorSystem) {
final MetricsConfig metricsConfig = serviceSpecificConfig.getMetricsConfig();
if (metricsConfig.isPrometheusEnabled() && null != prometheusReporter) {
final String prometheusHostname = metricsConfig.getPrometheusHostname();
final int prometheusPort = metricsConfig.getPrometheusPort();
final ActorMaterializer materializer = createActorMaterializer(actorSystem);
final Route prometheusReporterRoute = PrometheusReporterRoute
.buildPrometheusReporterRoute(prometheusReporter);
final CompletionStage<ServerBinding> binding = Http.get(actorSystem)
.bindAndHandle(prometheusReporterRoute.flow(actorSystem, materializer),
ConnectHttp.toHost(prometheusHostname, prometheusPort), materializer);
binding.thenAccept(theBinding -> CoordinatedShutdown.get(actorSystem).addTask(
CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_prometheus_http_endpoint", () -> {
logger.info("Gracefully shutting down Prometheus HTTP endpoint ...");
// prometheus requests don't get the luxury of being processed a long time after shutdown:
return theBinding.terminate(Duration.ofSeconds(1))
.handle((httpTerminated, e) -> Done.getInstance());
})
).exceptionally(failure -> {
logger.error("Kamon Prometheus HTTP endpoint could not be started: {}", failure.getMessage(), failure);
logger.error("Terminating ActorSystem!");
actorSystem.terminate();
return null;
});
}
}
/**
* Creates Akka configuration object Props for this actor.
*
* @param conciergeConfig the config of Concierge.
* @param pubSubMediator the PubSub mediator Actor.
* @param enforcerActorFactory factory for creating sharded enforcer actors.
* @param materializer the materializer for the Akka actor system.
* @return the Akka configuration Props object.
* @throws NullPointerException if any argument is {@code null}.
*/
public static <C extends ConciergeConfig> Props props(final C conciergeConfig,
final ActorRef pubSubMediator,
final EnforcerActorFactory<C> enforcerActorFactory,
final ActorMaterializer materializer) {
checkNotNull(conciergeConfig, "config of Concierge");
checkNotNull(pubSubMediator, "pub-sub mediator");
checkNotNull(enforcerActorFactory, "EnforcerActor factory");
checkNotNull(materializer, "ActorMaterializer");
return Props.create(ConciergeRootActor.class, conciergeConfig, pubSubMediator, enforcerActorFactory,
materializer);
}
@Before
public void before() {
actorSystem = ActorSystem.create(getClass().getSimpleName(),
ConfigFactory.parseString("search-dispatcher {\n" +
" type = PinnedDispatcher\n" +
" executor = \"thread-pool-executor\"\n" +
"}"));
materializer = ActorMaterializer.create(actorSystem);
readPersistence = provideReadPersistence();
writePersistence = provideWritePersistence();
thingsCollection = mongoClient.getDefaultDatabase().getCollection(PersistenceConstants.THINGS_COLLECTION_NAME);
}
@Override
protected Props getMainRootActorProps(final ConciergeConfig serviceSpecificConfig, final ActorRef pubSubMediator,
final ActorMaterializer materializer) {
return ConciergeRootActor.props(serviceSpecificConfig, pubSubMediator, new DefaultEnforcerActorFactory(),
materializer);
}