com.mongodb.client.model.BulkWriteOptions#akka.stream.javadsl.Source源码实例Demo

下面列出了com.mongodb.client.model.BulkWriteOptions#akka.stream.javadsl.Source 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ari-proxy   文件: AriEventProcessing.java
private static Try<Source<ProducerRecord<String, String>, NotUsed>> createSource(
		String kafkaCommandsTopic,
		String kafkaEventsAndResponsesTopic,
		AriMessageType type,
		LoggingAdapter log,
		String callContext,
		JsonNode messageBody) {

	final AriMessageEnvelope envelope = new AriMessageEnvelope(
			type,
			kafkaCommandsTopic,
			messageBody,
			callContext
	);

	return Try.of(() -> writer.writeValueAsString(envelope))
			.map(marshalledEnvelope -> {
				log.debug("[ARI MESSAGE TYPE] {}", envelope.getType());
				return Source.single(new ProducerRecord<>(
						kafkaEventsAndResponsesTopic,
						callContext,
						marshalledEnvelope
				));
			});
}
 
@Override
public Source<PolicyReferenceTag, NotUsed> getPolicyReferenceTags(final Map<PolicyId, Long> policyRevisions) {
    final Bson filter =
            in(PersistenceConstants.FIELD_POLICY_ID, policyRevisions.keySet()
                    .stream()
                    .map(String::valueOf)
                    .collect(Collectors.toSet()));
    final Publisher<Document> publisher =
            collection.find(filter).projection(new Document()
                    .append(PersistenceConstants.FIELD_ID, new BsonInt32(1))
                    .append(PersistenceConstants.FIELD_POLICY_ID, new BsonInt32(1)));
    return Source.fromPublisher(publisher)
            .mapConcat(doc -> {
                final ThingId thingId = ThingId.of(doc.getString(PersistenceConstants.FIELD_ID));
                final String policyIdString = doc.getString(PersistenceConstants.FIELD_POLICY_ID);
                final PolicyId policyId = PolicyId.of(policyIdString);
                final Long revision = policyRevisions.get(policyId);
                if (revision == null) {
                    return Collections.emptyList();
                } else {
                    final PolicyTag policyTag = PolicyTag.of(policyId, revision);
                    return Collections.singletonList(PolicyReferenceTag.of(thingId, policyTag));
                }
            });
}
 
源代码3 项目: ditto   文件: ReconnectActorTest.java
@Test
public void testRecoverConnections() {
    new TestKit(actorSystem) {{
        final TestProbe probe = new TestProbe(actorSystem);
        final ConnectionId connectionId1 = ConnectionId.of("connection-1");
        final ConnectionId connectionId2 = ConnectionId.of("connection-2");
        final ConnectionId connectionId3 = ConnectionId.of("connection-3");
        final Props props = ReconnectActor.props(probe.ref(),
                () -> Source.from(Arrays.asList(
                        ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX + connectionId1,
                        "invalid:" + connectionId2,
                        ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX + connectionId3)));

        actorSystem.actorOf(props);

        final RetrieveConnectionStatus msg1 = probe.expectMsgClass(RetrieveConnectionStatus.class);
        assertThat((CharSequence) msg1.getConnectionEntityId()).isEqualTo(connectionId1);
        final RetrieveConnectionStatus msg2 = probe.expectMsgClass(RetrieveConnectionStatus.class);
        assertThat((CharSequence) msg2.getConnectionEntityId()).isEqualTo(connectionId3);
    }};
}
 
源代码4 项目: ditto   文件: MongoReadJournal.java
private <T> Source<List<T>, NotUsed> unfoldBatchedSource(
        final String lowerBound,
        final ActorMaterializer mat,
        final Function<T, String> seedCreator,
        final Function<String, Source<T, ?>> sourceCreator) {

    return Source.unfoldAsync("",
            start -> {
                final String actualStart = lowerBound.compareTo(start) >= 0 ? lowerBound : start;
                return sourceCreator.apply(actualStart)
                        .runWith(Sink.seq(), mat)
                        .thenApply(list -> {
                            if (list.isEmpty()) {
                                return Optional.empty();
                            } else {
                                return Optional.of(Pair.create(seedCreator.apply(list.get(list.size() - 1)), list));
                            }
                        });
            })
            .withAttributes(Attributes.inputBuffer(1, 1));
}
 
源代码5 项目: ditto   文件: SubscriptionActorTest.java
@Test
public void partialFailure() {
    // comment the next line to get logs for debugging
    actorSystem.eventStream().setLogLevel(Attributes.logLevelOff());
    new TestKit(actorSystem) {{
        final ActorRef underTest = watch(newSubscriptionActor(Duration.ofMinutes(1L), this));
        final String subscriptionId = underTest.path().name();
        final DittoRuntimeException error =
                InvalidRqlExpressionException.fromMessage("mock error", DittoHeaders.empty());
        // not possible to use Source.concat -- it forces the second source immediately.
        final Source<JsonArray, NotUsed> lazilyFailingSource =
                Source.from(List.of(Source.single(JsonArray.of(1)),
                        Source.lazily(() -> Source.<JsonArray>failed(error))))
                        .flatMapConcat(x -> x);
        connect(underTest, lazilyFailingSource, this);
        underTest.tell(RequestFromSubscription.of(subscriptionId, 1L, DittoHeaders.empty()), getRef());
        expectMsg(SubscriptionHasNextPage.of(subscriptionId, JsonArray.of(1), DittoHeaders.empty()));
        expectMsg(SubscriptionFailed.of(subscriptionId, error, DittoHeaders.empty()));
    }};
}
 
源代码6 项目: ditto   文件: BackgroundSyncStream.java
/**
 * Emit metadata to trigger index update if the persistence snapshot and the search index entry are inconsistent.
 * Precondition: the thing IDs are identical and the search index entry is outside the tolerance window.
 *
 * @param persisted metadata from the snapshot store of the persistence.
 * @param indexed metadata from the search index with the same thing ID.
 * @return source of a metadata if the persistence and search index are inconsistent, or an empty source otherwise.
 */
private Source<Metadata, NotUsed> emitUnlessConsistent(final Metadata persisted, final Metadata indexed) {
    if (persisted.getThingRevision() > indexed.getThingRevision()) {
        return Source.single(indexed).log("RevisionMismatch");
    } else {
        final Optional<PolicyId> persistedPolicyId = persisted.getPolicyId();
        final Optional<PolicyId> indexedPolicyId = indexed.getPolicyId();
        if (!persistedPolicyId.equals(indexedPolicyId)) {
            return Source.single(indexed).log("PolicyIdMismatch");
        } else if (persistedPolicyId.isPresent()) {
            // policy IDs are equal and nonempty; retrieve and compare policy revision
            return retrievePolicyRevisionAndEmitMismatch(persistedPolicyId.get(), indexed);
        } else {
            // policy IDs are empty - the entries are consistent.
            return Source.empty();
        }
    }
}
 
源代码7 项目: ditto   文件: SearchSource.java
private Source<Pair<String, JsonObject>, NotUsed> retrieveThingForElement(final String thingId) {
    if (thingIdOnly) {
        final JsonObject idOnlyThingJson = JsonObject.newBuilder().set(Thing.JsonFields.ID, thingId).build();
        return Source.single(Pair.create(thingId, idOnlyThingJson));
    } else {
        return retrieveThing(thingId, fields)
                .map(thingJson -> Pair.create(thingId, thingJson))
                .recoverWithRetries(1,
                        new PFBuilder<Throwable, Graph<SourceShape<Pair<String, JsonObject>>, NotUsed>>()
                                .match(ThingNotAccessibleException.class, thingNotAccessible -> {
                                    // out-of-sync thing detected
                                    final ThingsOutOfSync thingsOutOfSync =
                                            ThingsOutOfSync.of(Collections.singletonList(ThingId.of(thingId)),
                                                    getDittoHeaders());

                                    pubSubMediator.tell(
                                            DistPubSubAccess.publishViaGroup(ThingsOutOfSync.TYPE, thingsOutOfSync),
                                            ActorRef.noSender());
                                    return Source.empty();
                                })
                                .build()
                );
    }
}
 
源代码8 项目: ditto   文件: MongoHealthChecker.java
private CompletionStage<Optional<Throwable>> generateStatusResponse() {

        final String id = UUID.randomUUID().toString();

        return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
                .flatMapConcat(s ->
                        Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
                                Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
                                        .map(DeleteResult::getDeletedCount)
                        )
                )
                .runWith(Sink.seq(), materializer)
                .handle((result, error) -> {
                    if (error != null) {
                        return Optional.of(error);
                    } else if (!Objects.equals(result, Collections.singletonList(1L))) {
                        final String message = "Expect 1 document inserted and deleted. Found: " + result;
                        return Optional.of(new IllegalStateException(message));
                    } else {
                        return Optional.empty();
                    }
                });
    }
 
源代码9 项目: ditto   文件: MongoThingsSearchPersistence.java
@Override
public Source<Long, NotUsed> count(final Query query,
        @Nullable final List<String> authorizationSubjectIds) {

    checkNotNull(query, "query");

    final BsonDocument queryFilter = getMongoFilter(query, authorizationSubjectIds);
    log.debug("count with query filter <{}>.", queryFilter);

    final CountOptions countOptions = new CountOptions()
            .skip(query.getSkip())
            .limit(query.getLimit())
            .maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS);

    return Source.fromPublisher(collection.count(queryFilter, countOptions))
            .mapError(handleMongoExecutionTimeExceededException())
            .log("count");
}
 
源代码10 项目: ditto   文件: MongoTimestampPersistence.java
/**
 * Creates the capped collection {@code collectionName} using {@code clientWrapper} if it doesn't exists yet.
 *
 * @param database The database to use.
 * @param collectionName The name of the capped collection that should be created.
 * @param cappedCollectionSizeInBytes The size in bytes of the collection that should be created.
 * @param materializer The actor materializer to pre-materialize the restart source.
 * @return Returns the created or retrieved collection.
 */
private static Source<MongoCollection, NotUsed> createOrGetCappedCollection(
        final MongoDatabase database,
        final String collectionName,
        final long cappedCollectionSizeInBytes,
        final ActorMaterializer materializer) {

    final Source<Success, NotUsed> createCollectionSource =
            repeatableCreateCappedCollectionSource(database, collectionName, cappedCollectionSizeInBytes);

    final Source<MongoCollection, NotUsed> infiniteCollectionSource =
            createCollectionSource.map(success -> database.getCollection(collectionName))
                    .flatMapConcat(Source::repeat);

    final Source<MongoCollection, NotUsed> restartSource =
            RestartSource.withBackoff(BACKOFF_MIN, BACKOFF_MAX, 1.0, () -> infiniteCollectionSource);

    // pre-materialize source with BroadcastHub so that a successfully obtained capped collection is reused
    // until the stream fails, whereupon it gets recreated with backoff.
    return restartSource.runWith(BroadcastHub.of(MongoCollection.class, 1), materializer);
}
 
源代码11 项目: ditto   文件: MongoTimestampPersistence.java
private static Source<Success, NotUsed> repeatableCreateCappedCollectionSource(
        final MongoDatabase database,
        final String collectionName,
        final long cappedCollectionSizeInBytes) {

    final CreateCollectionOptions collectionOptions = new CreateCollectionOptions()
            .capped(true)
            .sizeInBytes(cappedCollectionSizeInBytes)
            .maxDocuments(1);

    return Source.lazily(
            () -> Source.fromPublisher(database.createCollection(collectionName, collectionOptions)))
            .mapMaterializedValue(whatever -> NotUsed.getInstance())
            .withAttributes(Attributes.inputBuffer(1, 1))
            .recoverWithRetries(1, new PFBuilder<Throwable, Source<Success, NotUsed>>()
                    .match(MongoCommandException.class,
                            MongoTimestampPersistence::isCollectionAlreadyExistsError,
                            error -> Source.single(Success.SUCCESS))
                    .build());

}
 
源代码12 项目: ditto   文件: SearchActor.java
private <T> Source<T, NotUsed> processSearchPersistenceResult(Source<T, NotUsed> source,
        final DittoHeaders dittoHeaders) {

    final Flow<T, T, NotUsed> logAndFinishPersistenceSegmentFlow =
            Flow.fromFunction(result -> {
                // we know that the source provides exactly one ResultList
                LogUtil.enhanceLogWithCorrelationId(log, dittoHeaders.getCorrelationId());
                log.debug("Persistence returned: {}", result);
                return result;
            });

    return source.via(logAndFinishPersistenceSegmentFlow);
}
 
源代码13 项目: ditto   文件: SubscriptionActorTest.java
@Test
public void twoPages() {
    new TestKit(actorSystem) {{
        final ActorRef underTest = watch(newSubscriptionActor(Duration.ofMinutes(1L), this));
        final String subscriptionId = underTest.path().name();
        connect(underTest, Source.from(List.of(JsonArray.of(1), JsonArray.of(2))), this);
        underTest.tell(RequestFromSubscription.of(subscriptionId, 2L, DittoHeaders.empty()), getRef());
        expectMsg(SubscriptionHasNextPage.of(subscriptionId, JsonArray.of(1), DittoHeaders.empty()));
        expectMsg(SubscriptionHasNextPage.of(subscriptionId, JsonArray.of(2), DittoHeaders.empty()));
        expectMsg(SubscriptionComplete.of(underTest.path().name(), DittoHeaders.empty()));
    }};
}
 
源代码14 项目: ditto   文件: MongoThingsSearchPersistence.java
private Source<Document, NotUsed> findAllInternal(final Query query, final List<String> authorizationSubjectIds,
        @Nullable final Set<String> namespaces,
        @Nullable final Integer limit,
        @Nullable final Duration maxQueryTime) {

    checkNotNull(query, "query");

    final BsonDocument queryFilter = getMongoFilter(query, authorizationSubjectIds);
    if (log.isDebugEnabled()) {
        log.debug("findAll with query filter <{}>.", queryFilter);
    }

    final Bson sortOptions = getMongoSort(query);

    final int skip = query.getSkip();
    final Bson projection = GetSortBsonVisitor.projections(query.getSortOptions());
    final FindPublisher<Document> findPublisher =
            collection.find(queryFilter, Document.class)
                    .hint(hints.getHint(namespaces).orElse(null))
                    .sort(sortOptions)
                    .skip(skip)
                    .projection(projection);
    final FindPublisher<Document> findPublisherWithLimit = limit != null
            ? findPublisher.limit(limit)
            : findPublisher;
    final FindPublisher<Document> findPublisherWithMaxQueryTime = maxQueryTime != null
            ? findPublisherWithLimit.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
            : findPublisherWithLimit;

    return Source.fromPublisher(findPublisherWithMaxQueryTime);
}
 
源代码15 项目: ditto   文件: PolicyEventForwarder.java
@SuppressWarnings("unchecked")
private Source<PolicyReferenceTag, NotUsed> mapDumpResult(final Object dumpResult) {
    if (dumpResult instanceof Map) {
        return persistence.getPolicyReferenceTags((Map<PolicyId, Long>) dumpResult);
    } else {
        if (dumpResult instanceof Throwable) {
            log.error((Throwable) dumpResult, "dump failed");
        } else {
            log.warning("Unexpected dump result: <{}>", dumpResult);
        }
        return Source.empty();
    }
}
 
源代码16 项目: ditto   文件: StatsRoute.java
private Route handleDevOpsPerRequest(final RequestContext ctx,
        final Source<ByteString, ?> payloadSource,
        final Function<String, DevOpsCommand<?>> requestJsonToCommandFunction) {
    final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();

    payloadSource
            .fold(ByteString.empty(), ByteString::concat)
            .map(ByteString::utf8String)
            .map(requestJsonToCommandFunction)
            .to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
                    AbstractHttpRequestActor.COMPLETE_MESSAGE))
            .run(materializer);

    return completeWithFuture(httpResponseFuture);
}
 
private void dropCollectionWithBackoff(final MongoCollection<Document> collection) {
    RuntimeException lastException = null;
    for (int i = 0; i < 20; ++i) {
        try {
            waitFor(Source.fromPublisher(collection.drop()));
            return;
        } catch (final RuntimeException e) {
            lastException = e;
            backoff();
        }
    }
    throw lastException;
}
 
源代码18 项目: ditto   文件: BackgroundSyncActor.java
private Source<Metadata, NotUsed> wrapAsResumeSource(final ThingId lowerBound,
        final Function<ThingId, Source<Metadata, ?>> sourceCreator) {

    return ResumeSource.onFailureWithBackoff(
            config.getMinBackoff(),
            config.getMaxBackoff(),
            config.getMaxRestarts(),
            config.getRecovery(),
            lowerBound,
            sourceCreator,
            1,
            lastMetadata -> nextLowerBound(lowerBound, lastMetadata));
}
 
源代码19 项目: ditto   文件: MongoTimestampPersistenceIT.java
@Test
public void ensureCollectionIsCapped() throws Exception {
    final MongoCollection<Document> collection =
            syncPersistence.getCollection().runWith(Sink.head(), materializer).toCompletableFuture().get();

    runBlocking(syncPersistence.setTimestamp(Instant.now()));
    runBlocking(syncPersistence.setTimestamp(Instant.now()));

    assertThat(runBlocking(Source.fromPublisher(collection.count()))).containsExactly(1L);
}
 
源代码20 项目: tutorials   文件: DataImporterUnitTest.java
@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
    //given
    Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
    String input = "1;9;11;0";

    //when
    Source<Double, NotUsed> flow = Source.single(input).via(tested);

    //then
    flow
            .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
            .request(4)
            .expectNextUnordered(5d, 5.5);
}
 
源代码21 项目: ari-proxy   文件: AriEventProcessingTest.java
@Test
void checkApplicationReplacedHandlerIsTriggered() {
	new TestKit(system)
	{
		{
			final Future<Source<ProducerRecord<String, String>, NotUsed>> wsToKafkaProcessor = Future.of(
					() -> AriEventProcessing
							.generateProducerRecordFromEvent(fakeCommandsTopic, fakeEventsAndResponsesTopic, new Strict(applicationReplacedEvent), getRef(), system.log(), genApplicationReplacedHandler.apply(getRef()))
			);
			assertThat(expectMsgClass(String.class), is("Shutdown triggered!"));
			assertThat(wsToKafkaProcessor.await().get(), is(Source.empty()));
		}
	};
}
 
源代码22 项目: ts-reaktive   文件: GroupWhileSpec.java
private Seq<Seq<Integer>> run(Source<Integer,?> source) {
    try {
        List<Seq<Integer>> result = source.runWith(sink, materializer).toCompletableFuture().get(10, TimeUnit.SECONDS);
        return Vector.ofAll(result);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        throw new RuntimeException(e);
    }
}
 
源代码23 项目: lagom-example   文件: StreamIT.java
@Test
public void helloStream() throws Exception {
    // Important to concat our source with a maybe, this ensures the connection doesn't get closed once we've
    // finished feeding our elements in, and then also to take 3 from the response stream, this ensures our
    // connection does get closed once we've received the 3 elements.
    Source<String, ?> response = await(streamService.stream().invoke(
            Source.from(Arrays.asList("a", "b", "c"))
                    .concat(Source.maybe())));
    List<String> messages = await(response.take(3).runWith(Sink.seq(), mat));
    assertEquals(Arrays.asList("Hello, a!", "Hello, b!", "Hello, c!"), messages);
}
 
源代码24 项目: ditto   文件: SubscriptionActorTest.java
@Test
public void timeout() {
    new TestKit(actorSystem) {{
        final ActorRef underTest = watch(newSubscriptionActor(Duration.ZERO, this));
        connect(underTest, Source.single(JsonArray.of(1)), this);
        final SubscriptionFailed subscriptionFailed = expectMsgClass(SubscriptionFailed.class);
        assertThat(subscriptionFailed.getError().getErrorCode()).isEqualTo(SubscriptionTimeoutException.ERROR_CODE);
    }};
}
 
@Test
public void test_a_source() {
    Sink<Object, TestSubscriber.Probe<Object>> sink = TestSink.probe(system);
    Source<Object, NotUsed> sourceUnderTest = Source.single("test");

    sourceUnderTest.runWith(sink, materializer)
            .request(1)
            .expectNext("test")
            .expectComplete();
}
 
源代码26 项目: ditto   文件: BackgroundSyncStream.java
/**
 * Discover inconsistencies between the persisted and indexed metadata and emit extra/nonexistent/mismatched
 * entries of the search index.
 *
 * @param metadataFromSnapshots metadata streamed from the things snapshot store.
 * @param metadataFromSearchIndex metadata streamed from the search index.
 * @return source of inconsistent entries.
 */
public Source<Metadata, NotUsed> filterForInconsistencies(final Source<Metadata, ?> metadataFromSnapshots,
        final Source<Metadata, ?> metadataFromSearchIndex) {


    final Comparator<Metadata> comparator = BackgroundSyncStream::compareMetadata;
    return MergeSortedAsPair.merge(dummyMetadata(), comparator, metadataFromSnapshots, metadataFromSearchIndex)
            .throttle(throttleThroughput, throttlePeriod)
            .flatMapConcat(this::filterForInconsistency)
            // log elements at warning level because out-of-date metadata are detected
            .withAttributes(Attributes.logLevels(
                    Attributes.logLevelWarning(),
                    Attributes.logLevelDebug(),
                    Attributes.logLevelError()));
}
 
public Source<Pair<OrderPlaced, Offset>, ?> ordersStream(AggregateEventTag<PortfolioEvent> tag, Offset offset) {
    return persistentEntities.eventStream(tag, offset)
            .filter(eventOffset ->
                    eventOffset.first() instanceof PortfolioEvent.OrderPlaced
            ).mapAsync(1, eventOffset -> {
                PortfolioEvent.OrderPlaced orderPlaced = (PortfolioEvent.OrderPlaced) eventOffset.first();
                log.info(String.format("Publishing order %s", orderPlaced.getOrderId()));
                return CompletableFuture.completedFuture(Pair.create(
                        orderPlaced.asDomainEvent(),
                        eventOffset.second()
                ));
            });
}
 
源代码28 项目: ts-reaktive   文件: AkkaStreams.java
/**
 * Materializes the given source and waits for it to successfully emit one element. It then completes the returned
 * CompletionStage with the full stream. It will wait indefinitely for that first element, so timeouts will have to be handled
 * separately on the stream, returned future, or both.
 * 
 * This is useful in cases where you want to "fail early" when handling a stream result. For example, you might want
 * to build an http response based on a stream, but want to set a different status code if the stream fails
 * to emit any element.
 */
public static <T> CompletionStage<Source<T,NotUsed>> awaitOne(Source<T,?> source, Materializer materializer) {
    return source.prefixAndTail(1).map(pair -> {
        if (pair.first().isEmpty()) {
            return pair.second();
        } else {
            T head = pair.first().get(0);
            Source<T, NotUsed> tail = pair.second();
            return Source.single(head).concat(tail);                
        }
    }).runWith(Sink.head(), materializer);
}
 
源代码29 项目: ditto   文件: ThingsJournalTestHelper.java
private <T> List<T> runBlockingWithReturn(final Source<T, NotUsed> publisher) {
    final CompletionStage<List<T>> done = publisher.runWith(Sink.seq(), mat);
    try {
        return done.toCompletableFuture().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
    } catch (final InterruptedException | ExecutionException | TimeoutException e) {
        throw new IllegalStateException(e);
    }
}
 
源代码30 项目: ts-reaktive   文件: S3.java
/**
  * Loads the last known written offset from S3, or returns 0 if not found
  */
 public CompletionStage<Long> loadOffset() {
     return download("_lastOffset")
 		.reduce((bs1, bs2) -> bs1.concat(bs2))
 		.map(bs -> Long.parseLong(bs.utf8String()))
 		.recoverWith(new PFBuilder<Throwable, Source<Long,NotUsed>>()
 			.matchAny(x -> Source.single(0L)) // not found -> start at 0
	.build()
)
 		.runWith(Sink.head(), materializer);
 }