下面列出了com.mongodb.client.model.BulkWriteOptions#akka.stream.javadsl.Source 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static Try<Source<ProducerRecord<String, String>, NotUsed>> createSource(
String kafkaCommandsTopic,
String kafkaEventsAndResponsesTopic,
AriMessageType type,
LoggingAdapter log,
String callContext,
JsonNode messageBody) {
final AriMessageEnvelope envelope = new AriMessageEnvelope(
type,
kafkaCommandsTopic,
messageBody,
callContext
);
return Try.of(() -> writer.writeValueAsString(envelope))
.map(marshalledEnvelope -> {
log.debug("[ARI MESSAGE TYPE] {}", envelope.getType());
return Source.single(new ProducerRecord<>(
kafkaEventsAndResponsesTopic,
callContext,
marshalledEnvelope
));
});
}
@Override
public Source<PolicyReferenceTag, NotUsed> getPolicyReferenceTags(final Map<PolicyId, Long> policyRevisions) {
final Bson filter =
in(PersistenceConstants.FIELD_POLICY_ID, policyRevisions.keySet()
.stream()
.map(String::valueOf)
.collect(Collectors.toSet()));
final Publisher<Document> publisher =
collection.find(filter).projection(new Document()
.append(PersistenceConstants.FIELD_ID, new BsonInt32(1))
.append(PersistenceConstants.FIELD_POLICY_ID, new BsonInt32(1)));
return Source.fromPublisher(publisher)
.mapConcat(doc -> {
final ThingId thingId = ThingId.of(doc.getString(PersistenceConstants.FIELD_ID));
final String policyIdString = doc.getString(PersistenceConstants.FIELD_POLICY_ID);
final PolicyId policyId = PolicyId.of(policyIdString);
final Long revision = policyRevisions.get(policyId);
if (revision == null) {
return Collections.emptyList();
} else {
final PolicyTag policyTag = PolicyTag.of(policyId, revision);
return Collections.singletonList(PolicyReferenceTag.of(thingId, policyTag));
}
});
}
@Test
public void testRecoverConnections() {
new TestKit(actorSystem) {{
final TestProbe probe = new TestProbe(actorSystem);
final ConnectionId connectionId1 = ConnectionId.of("connection-1");
final ConnectionId connectionId2 = ConnectionId.of("connection-2");
final ConnectionId connectionId3 = ConnectionId.of("connection-3");
final Props props = ReconnectActor.props(probe.ref(),
() -> Source.from(Arrays.asList(
ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX + connectionId1,
"invalid:" + connectionId2,
ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX + connectionId3)));
actorSystem.actorOf(props);
final RetrieveConnectionStatus msg1 = probe.expectMsgClass(RetrieveConnectionStatus.class);
assertThat((CharSequence) msg1.getConnectionEntityId()).isEqualTo(connectionId1);
final RetrieveConnectionStatus msg2 = probe.expectMsgClass(RetrieveConnectionStatus.class);
assertThat((CharSequence) msg2.getConnectionEntityId()).isEqualTo(connectionId3);
}};
}
private <T> Source<List<T>, NotUsed> unfoldBatchedSource(
final String lowerBound,
final ActorMaterializer mat,
final Function<T, String> seedCreator,
final Function<String, Source<T, ?>> sourceCreator) {
return Source.unfoldAsync("",
start -> {
final String actualStart = lowerBound.compareTo(start) >= 0 ? lowerBound : start;
return sourceCreator.apply(actualStart)
.runWith(Sink.seq(), mat)
.thenApply(list -> {
if (list.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(Pair.create(seedCreator.apply(list.get(list.size() - 1)), list));
}
});
})
.withAttributes(Attributes.inputBuffer(1, 1));
}
@Test
public void partialFailure() {
// comment the next line to get logs for debugging
actorSystem.eventStream().setLogLevel(Attributes.logLevelOff());
new TestKit(actorSystem) {{
final ActorRef underTest = watch(newSubscriptionActor(Duration.ofMinutes(1L), this));
final String subscriptionId = underTest.path().name();
final DittoRuntimeException error =
InvalidRqlExpressionException.fromMessage("mock error", DittoHeaders.empty());
// not possible to use Source.concat -- it forces the second source immediately.
final Source<JsonArray, NotUsed> lazilyFailingSource =
Source.from(List.of(Source.single(JsonArray.of(1)),
Source.lazily(() -> Source.<JsonArray>failed(error))))
.flatMapConcat(x -> x);
connect(underTest, lazilyFailingSource, this);
underTest.tell(RequestFromSubscription.of(subscriptionId, 1L, DittoHeaders.empty()), getRef());
expectMsg(SubscriptionHasNextPage.of(subscriptionId, JsonArray.of(1), DittoHeaders.empty()));
expectMsg(SubscriptionFailed.of(subscriptionId, error, DittoHeaders.empty()));
}};
}
/**
* Emit metadata to trigger index update if the persistence snapshot and the search index entry are inconsistent.
* Precondition: the thing IDs are identical and the search index entry is outside the tolerance window.
*
* @param persisted metadata from the snapshot store of the persistence.
* @param indexed metadata from the search index with the same thing ID.
* @return source of a metadata if the persistence and search index are inconsistent, or an empty source otherwise.
*/
private Source<Metadata, NotUsed> emitUnlessConsistent(final Metadata persisted, final Metadata indexed) {
if (persisted.getThingRevision() > indexed.getThingRevision()) {
return Source.single(indexed).log("RevisionMismatch");
} else {
final Optional<PolicyId> persistedPolicyId = persisted.getPolicyId();
final Optional<PolicyId> indexedPolicyId = indexed.getPolicyId();
if (!persistedPolicyId.equals(indexedPolicyId)) {
return Source.single(indexed).log("PolicyIdMismatch");
} else if (persistedPolicyId.isPresent()) {
// policy IDs are equal and nonempty; retrieve and compare policy revision
return retrievePolicyRevisionAndEmitMismatch(persistedPolicyId.get(), indexed);
} else {
// policy IDs are empty - the entries are consistent.
return Source.empty();
}
}
}
private Source<Pair<String, JsonObject>, NotUsed> retrieveThingForElement(final String thingId) {
if (thingIdOnly) {
final JsonObject idOnlyThingJson = JsonObject.newBuilder().set(Thing.JsonFields.ID, thingId).build();
return Source.single(Pair.create(thingId, idOnlyThingJson));
} else {
return retrieveThing(thingId, fields)
.map(thingJson -> Pair.create(thingId, thingJson))
.recoverWithRetries(1,
new PFBuilder<Throwable, Graph<SourceShape<Pair<String, JsonObject>>, NotUsed>>()
.match(ThingNotAccessibleException.class, thingNotAccessible -> {
// out-of-sync thing detected
final ThingsOutOfSync thingsOutOfSync =
ThingsOutOfSync.of(Collections.singletonList(ThingId.of(thingId)),
getDittoHeaders());
pubSubMediator.tell(
DistPubSubAccess.publishViaGroup(ThingsOutOfSync.TYPE, thingsOutOfSync),
ActorRef.noSender());
return Source.empty();
})
.build()
);
}
}
private CompletionStage<Optional<Throwable>> generateStatusResponse() {
final String id = UUID.randomUUID().toString();
return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
.flatMapConcat(s ->
Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
.map(DeleteResult::getDeletedCount)
)
)
.runWith(Sink.seq(), materializer)
.handle((result, error) -> {
if (error != null) {
return Optional.of(error);
} else if (!Objects.equals(result, Collections.singletonList(1L))) {
final String message = "Expect 1 document inserted and deleted. Found: " + result;
return Optional.of(new IllegalStateException(message));
} else {
return Optional.empty();
}
});
}
@Override
public Source<Long, NotUsed> count(final Query query,
@Nullable final List<String> authorizationSubjectIds) {
checkNotNull(query, "query");
final BsonDocument queryFilter = getMongoFilter(query, authorizationSubjectIds);
log.debug("count with query filter <{}>.", queryFilter);
final CountOptions countOptions = new CountOptions()
.skip(query.getSkip())
.limit(query.getLimit())
.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS);
return Source.fromPublisher(collection.count(queryFilter, countOptions))
.mapError(handleMongoExecutionTimeExceededException())
.log("count");
}
/**
* Creates the capped collection {@code collectionName} using {@code clientWrapper} if it doesn't exists yet.
*
* @param database The database to use.
* @param collectionName The name of the capped collection that should be created.
* @param cappedCollectionSizeInBytes The size in bytes of the collection that should be created.
* @param materializer The actor materializer to pre-materialize the restart source.
* @return Returns the created or retrieved collection.
*/
private static Source<MongoCollection, NotUsed> createOrGetCappedCollection(
final MongoDatabase database,
final String collectionName,
final long cappedCollectionSizeInBytes,
final ActorMaterializer materializer) {
final Source<Success, NotUsed> createCollectionSource =
repeatableCreateCappedCollectionSource(database, collectionName, cappedCollectionSizeInBytes);
final Source<MongoCollection, NotUsed> infiniteCollectionSource =
createCollectionSource.map(success -> database.getCollection(collectionName))
.flatMapConcat(Source::repeat);
final Source<MongoCollection, NotUsed> restartSource =
RestartSource.withBackoff(BACKOFF_MIN, BACKOFF_MAX, 1.0, () -> infiniteCollectionSource);
// pre-materialize source with BroadcastHub so that a successfully obtained capped collection is reused
// until the stream fails, whereupon it gets recreated with backoff.
return restartSource.runWith(BroadcastHub.of(MongoCollection.class, 1), materializer);
}
private static Source<Success, NotUsed> repeatableCreateCappedCollectionSource(
final MongoDatabase database,
final String collectionName,
final long cappedCollectionSizeInBytes) {
final CreateCollectionOptions collectionOptions = new CreateCollectionOptions()
.capped(true)
.sizeInBytes(cappedCollectionSizeInBytes)
.maxDocuments(1);
return Source.lazily(
() -> Source.fromPublisher(database.createCollection(collectionName, collectionOptions)))
.mapMaterializedValue(whatever -> NotUsed.getInstance())
.withAttributes(Attributes.inputBuffer(1, 1))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Success, NotUsed>>()
.match(MongoCommandException.class,
MongoTimestampPersistence::isCollectionAlreadyExistsError,
error -> Source.single(Success.SUCCESS))
.build());
}
private <T> Source<T, NotUsed> processSearchPersistenceResult(Source<T, NotUsed> source,
final DittoHeaders dittoHeaders) {
final Flow<T, T, NotUsed> logAndFinishPersistenceSegmentFlow =
Flow.fromFunction(result -> {
// we know that the source provides exactly one ResultList
LogUtil.enhanceLogWithCorrelationId(log, dittoHeaders.getCorrelationId());
log.debug("Persistence returned: {}", result);
return result;
});
return source.via(logAndFinishPersistenceSegmentFlow);
}
@Test
public void twoPages() {
new TestKit(actorSystem) {{
final ActorRef underTest = watch(newSubscriptionActor(Duration.ofMinutes(1L), this));
final String subscriptionId = underTest.path().name();
connect(underTest, Source.from(List.of(JsonArray.of(1), JsonArray.of(2))), this);
underTest.tell(RequestFromSubscription.of(subscriptionId, 2L, DittoHeaders.empty()), getRef());
expectMsg(SubscriptionHasNextPage.of(subscriptionId, JsonArray.of(1), DittoHeaders.empty()));
expectMsg(SubscriptionHasNextPage.of(subscriptionId, JsonArray.of(2), DittoHeaders.empty()));
expectMsg(SubscriptionComplete.of(underTest.path().name(), DittoHeaders.empty()));
}};
}
private Source<Document, NotUsed> findAllInternal(final Query query, final List<String> authorizationSubjectIds,
@Nullable final Set<String> namespaces,
@Nullable final Integer limit,
@Nullable final Duration maxQueryTime) {
checkNotNull(query, "query");
final BsonDocument queryFilter = getMongoFilter(query, authorizationSubjectIds);
if (log.isDebugEnabled()) {
log.debug("findAll with query filter <{}>.", queryFilter);
}
final Bson sortOptions = getMongoSort(query);
final int skip = query.getSkip();
final Bson projection = GetSortBsonVisitor.projections(query.getSortOptions());
final FindPublisher<Document> findPublisher =
collection.find(queryFilter, Document.class)
.hint(hints.getHint(namespaces).orElse(null))
.sort(sortOptions)
.skip(skip)
.projection(projection);
final FindPublisher<Document> findPublisherWithLimit = limit != null
? findPublisher.limit(limit)
: findPublisher;
final FindPublisher<Document> findPublisherWithMaxQueryTime = maxQueryTime != null
? findPublisherWithLimit.maxTime(maxQueryTime.getSeconds(), TimeUnit.SECONDS)
: findPublisherWithLimit;
return Source.fromPublisher(findPublisherWithMaxQueryTime);
}
@SuppressWarnings("unchecked")
private Source<PolicyReferenceTag, NotUsed> mapDumpResult(final Object dumpResult) {
if (dumpResult instanceof Map) {
return persistence.getPolicyReferenceTags((Map<PolicyId, Long>) dumpResult);
} else {
if (dumpResult instanceof Throwable) {
log.error((Throwable) dumpResult, "dump failed");
} else {
log.warning("Unexpected dump result: <{}>", dumpResult);
}
return Source.empty();
}
}
private Route handleDevOpsPerRequest(final RequestContext ctx,
final Source<ByteString, ?> payloadSource,
final Function<String, DevOpsCommand<?>> requestJsonToCommandFunction) {
final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
payloadSource
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(requestJsonToCommandFunction)
.to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
AbstractHttpRequestActor.COMPLETE_MESSAGE))
.run(materializer);
return completeWithFuture(httpResponseFuture);
}
private void dropCollectionWithBackoff(final MongoCollection<Document> collection) {
RuntimeException lastException = null;
for (int i = 0; i < 20; ++i) {
try {
waitFor(Source.fromPublisher(collection.drop()));
return;
} catch (final RuntimeException e) {
lastException = e;
backoff();
}
}
throw lastException;
}
private Source<Metadata, NotUsed> wrapAsResumeSource(final ThingId lowerBound,
final Function<ThingId, Source<Metadata, ?>> sourceCreator) {
return ResumeSource.onFailureWithBackoff(
config.getMinBackoff(),
config.getMaxBackoff(),
config.getMaxRestarts(),
config.getRecovery(),
lowerBound,
sourceCreator,
1,
lastMetadata -> nextLowerBound(lowerBound, lastMetadata));
}
@Test
public void ensureCollectionIsCapped() throws Exception {
final MongoCollection<Document> collection =
syncPersistence.getCollection().runWith(Sink.head(), materializer).toCompletableFuture().get();
runBlocking(syncPersistence.setTimestamp(Instant.now()));
runBlocking(syncPersistence.setTimestamp(Instant.now()));
assertThat(runBlocking(Source.fromPublisher(collection.count()))).containsExactly(1L);
}
@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
//given
Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
String input = "1;9;11;0";
//when
Source<Double, NotUsed> flow = Source.single(input).via(tested);
//then
flow
.runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
.request(4)
.expectNextUnordered(5d, 5.5);
}
@Test
void checkApplicationReplacedHandlerIsTriggered() {
new TestKit(system)
{
{
final Future<Source<ProducerRecord<String, String>, NotUsed>> wsToKafkaProcessor = Future.of(
() -> AriEventProcessing
.generateProducerRecordFromEvent(fakeCommandsTopic, fakeEventsAndResponsesTopic, new Strict(applicationReplacedEvent), getRef(), system.log(), genApplicationReplacedHandler.apply(getRef()))
);
assertThat(expectMsgClass(String.class), is("Shutdown triggered!"));
assertThat(wsToKafkaProcessor.await().get(), is(Source.empty()));
}
};
}
private Seq<Seq<Integer>> run(Source<Integer,?> source) {
try {
List<Seq<Integer>> result = source.runWith(sink, materializer).toCompletableFuture().get(10, TimeUnit.SECONDS);
return Vector.ofAll(result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
@Test
public void helloStream() throws Exception {
// Important to concat our source with a maybe, this ensures the connection doesn't get closed once we've
// finished feeding our elements in, and then also to take 3 from the response stream, this ensures our
// connection does get closed once we've received the 3 elements.
Source<String, ?> response = await(streamService.stream().invoke(
Source.from(Arrays.asList("a", "b", "c"))
.concat(Source.maybe())));
List<String> messages = await(response.take(3).runWith(Sink.seq(), mat));
assertEquals(Arrays.asList("Hello, a!", "Hello, b!", "Hello, c!"), messages);
}
@Test
public void timeout() {
new TestKit(actorSystem) {{
final ActorRef underTest = watch(newSubscriptionActor(Duration.ZERO, this));
connect(underTest, Source.single(JsonArray.of(1)), this);
final SubscriptionFailed subscriptionFailed = expectMsgClass(SubscriptionFailed.class);
assertThat(subscriptionFailed.getError().getErrorCode()).isEqualTo(SubscriptionTimeoutException.ERROR_CODE);
}};
}
@Test
public void test_a_source() {
Sink<Object, TestSubscriber.Probe<Object>> sink = TestSink.probe(system);
Source<Object, NotUsed> sourceUnderTest = Source.single("test");
sourceUnderTest.runWith(sink, materializer)
.request(1)
.expectNext("test")
.expectComplete();
}
/**
* Discover inconsistencies between the persisted and indexed metadata and emit extra/nonexistent/mismatched
* entries of the search index.
*
* @param metadataFromSnapshots metadata streamed from the things snapshot store.
* @param metadataFromSearchIndex metadata streamed from the search index.
* @return source of inconsistent entries.
*/
public Source<Metadata, NotUsed> filterForInconsistencies(final Source<Metadata, ?> metadataFromSnapshots,
final Source<Metadata, ?> metadataFromSearchIndex) {
final Comparator<Metadata> comparator = BackgroundSyncStream::compareMetadata;
return MergeSortedAsPair.merge(dummyMetadata(), comparator, metadataFromSnapshots, metadataFromSearchIndex)
.throttle(throttleThroughput, throttlePeriod)
.flatMapConcat(this::filterForInconsistency)
// log elements at warning level because out-of-date metadata are detected
.withAttributes(Attributes.logLevels(
Attributes.logLevelWarning(),
Attributes.logLevelDebug(),
Attributes.logLevelError()));
}
public Source<Pair<OrderPlaced, Offset>, ?> ordersStream(AggregateEventTag<PortfolioEvent> tag, Offset offset) {
return persistentEntities.eventStream(tag, offset)
.filter(eventOffset ->
eventOffset.first() instanceof PortfolioEvent.OrderPlaced
).mapAsync(1, eventOffset -> {
PortfolioEvent.OrderPlaced orderPlaced = (PortfolioEvent.OrderPlaced) eventOffset.first();
log.info(String.format("Publishing order %s", orderPlaced.getOrderId()));
return CompletableFuture.completedFuture(Pair.create(
orderPlaced.asDomainEvent(),
eventOffset.second()
));
});
}
/**
* Materializes the given source and waits for it to successfully emit one element. It then completes the returned
* CompletionStage with the full stream. It will wait indefinitely for that first element, so timeouts will have to be handled
* separately on the stream, returned future, or both.
*
* This is useful in cases where you want to "fail early" when handling a stream result. For example, you might want
* to build an http response based on a stream, but want to set a different status code if the stream fails
* to emit any element.
*/
public static <T> CompletionStage<Source<T,NotUsed>> awaitOne(Source<T,?> source, Materializer materializer) {
return source.prefixAndTail(1).map(pair -> {
if (pair.first().isEmpty()) {
return pair.second();
} else {
T head = pair.first().get(0);
Source<T, NotUsed> tail = pair.second();
return Source.single(head).concat(tail);
}
}).runWith(Sink.head(), materializer);
}
private <T> List<T> runBlockingWithReturn(final Source<T, NotUsed> publisher) {
final CompletionStage<List<T>> done = publisher.runWith(Sink.seq(), mat);
try {
return done.toCompletableFuture().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
}
/**
* Loads the last known written offset from S3, or returns 0 if not found
*/
public CompletionStage<Long> loadOffset() {
return download("_lastOffset")
.reduce((bs1, bs2) -> bs1.concat(bs2))
.map(bs -> Long.parseLong(bs.utf8String()))
.recoverWith(new PFBuilder<Throwable, Source<Long,NotUsed>>()
.matchAny(x -> Source.single(0L)) // not found -> start at 0
.build()
)
.runWith(Sink.head(), materializer);
}