下面列出了org.junit.jupiter.api.Timeout#reactor.core.publisher.Flux 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
* Does <strong>not</strong> close the channel when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @param position the file position at which the write is to begin; must be non-negative
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Flux<DataBuffer> write(
Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {
Assert.notNull(source, "'source' must not be null");
Assert.notNull(channel, "'channel' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Flux<DataBuffer> flux = Flux.from(source);
return Flux.create(sink -> {
AsynchronousFileChannelWriteCompletionHandler completionHandler =
new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
sink.onDispose(completionHandler);
flux.subscribe(completionHandler);
});
}
@Override
public Flux<NumericResponse<BitCountCommand, Long>> bitCount(Publisher<BitCountCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Range<Long> range = command.getRange();
if (range == null) {
range = Range.unbounded();
}
byte[] keyBuf = toByteArray(command.getKey());
Mono<Long> m;
if (range == Range.<Long>unbounded()) {
m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.BITCOUNT, keyBuf);
} else {
m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.BITCOUNT,
keyBuf, range.getLowerBound().getValue().orElse(0L),
range.getUpperBound().getValue().get());
}
return m.map(v -> new NumericResponse<>(command, v));
});
}
@Test
void contextShouldCombineMDCs() {
String value1 = "value1";
String value2 = "value2";
String key1 = "key1";
String key2 = "key2";
Flux.just(1)
.doOnEach(ReactorUtils.log(() -> {
assertThat(MDC.get(key1)).isEqualTo(value1);
assertThat(MDC.get(key2)).isEqualTo(value2);
}))
.subscriberContext(ReactorUtils.context("test1", MDCBuilder.of(key1, value1)))
.subscriberContext(ReactorUtils.context("test2", MDCBuilder.of(key2, value2)))
.blockLast();
}
@Override
public Flux<ByteBufferResponse<BRPopLPushCommand>> bRPopLPush(Publisher<BRPopLPushCommand> commands) {
return execute(commands, command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getDestination(), "Destination key must not be null!");
Assert.notNull(command.getTimeout(), "Timeout must not be null!");
byte[] keyBuf = toByteArray(command.getKey());
byte[] destinationBuf = toByteArray(command.getDestination());
Mono<byte[]> m = write(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.BRPOPLPUSH,
keyBuf, destinationBuf, command.getTimeout().getSeconds());
return m.map(v -> new ByteBufferResponse<>(command, ByteBuffer.wrap(v)));
});
}
@Test
public void testSimpleGet_success() throws JsonProcessingException {
wireMockRule.stubFor(get(urlEqualTo("/icecream/flavors"))
.willReturn(aResponse().withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody(TestUtils.MAPPER.writeValueAsString(Flavor.values()))));
wireMockRule.stubFor(get(urlEqualTo("/icecream/mixins"))
.willReturn(aResponse().withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody(TestUtils.MAPPER.writeValueAsString(Mixin.values()))));
Flux<Flavor> flavors = client.getAvailableFlavors();
Flux<Mixin> mixins = client.getAvailableMixins();
StepVerifier.create(flavors)
.expectNextSequence(asList(Flavor.values()))
.verifyComplete();
StepVerifier.create(mixins)
.expectNextSequence(asList(Mixin.values()))
.verifyComplete();
}
private void mockAppResultsInAppList() {
givenRequestListApplications(Flux.just(ApplicationSummary.builder()
.diskQuota(0)
.id("test-application-id-1")
.instances(1)
.memoryLimit(0)
.name("test-application-1")
.requestedState("RUNNING")
.runningInstances(1)
.build(),
ApplicationSummary.builder()
.diskQuota(0)
.id("test-application-id-2")
.instances(1)
.memoryLimit(0)
.name("test-application-2")
.requestedState("RUNNING")
.runningInstances(1)
.build()));
}
@Test
public void expectNextCountErrorIsSuppressed() {
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.create(Flux.just("foo")
.flatMap(r -> { throw new ArrayIndexOutOfBoundsException();}))
.expectNextCount(1)
.verifyError())
.satisfies(error -> {
assertThat(error)
.hasMessageStartingWith("expectation \"expectNextCount(1)\" failed")
.hasMessageContaining("signal: onError(java.lang.ArrayIndexOutOfBoundsException)");
assertThat(error.getSuppressed())
.hasSize(1)
.allMatch(spr -> spr instanceof ArrayIndexOutOfBoundsException);
});
}
@SuppressWarnings("Duplicates")
RSocket accept(RSocket rSocket) {
RSocket pong = new RSocketProxy(rSocket) {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).map(Payload::getDataUtf8).doOnNext(str -> {
int received = pingsReceived.incrementAndGet();
log.info("received " + str + "(" + received + ") in Pong");
}).map(PingPongApp::reply).map(reply -> {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
reply);
ByteBuf routingMetadata = getForwardingMetadata(strategies,
"ping", 1L);
return DefaultPayload.create(data, routingMetadata);
});
}
};
return pong;
}
@Test(timeout = 1000L)
public void expectCancelDoNotHang() {
StepVerifier.create(Flux.just("foo", "bar"), 1)
.expectNext("foo")
.thenCancel()
.verify();
}
@Test
public void loadingTwice() throws Exception {
//given
final Flux<Domain> domains = Domains.all();
//when
final List<Domain> first = domains.collectList().block();
final List<Domain> second = domains.collectList().block();
//then
assertThat(first).isEqualTo(second);
}
@Test
void searchForMessageShouldReturnMessagesFromMyDelegatedMailboxes() throws Exception {
assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.ACL));
session = mailboxManager.createSystemSession(USER_1);
MailboxSession sessionFromDelegater = mailboxManager.createSystemSession(USER_2);
MailboxPath delegatedMailboxPath = MailboxPath.forUser(USER_2, "SHARED");
MailboxId delegatedMailboxId = mailboxManager.createMailbox(delegatedMailboxPath, sessionFromDelegater).get();
MessageManager delegatedMessageManager = mailboxManager.getMailbox(delegatedMailboxId, sessionFromDelegater);
MessageId messageId = delegatedMessageManager
.appendMessage(AppendCommand.from(message), sessionFromDelegater)
.getId().getMessageId();
mailboxManager.setRights(delegatedMailboxPath,
MailboxACL.EMPTY.apply(MailboxACL.command()
.forUser(USER_1)
.rights(MailboxACL.Right.Read, MailboxACL.Right.Lookup)
.asAddition()),
sessionFromDelegater);
MultimailboxesSearchQuery multiMailboxesQuery = MultimailboxesSearchQuery
.from(SearchQuery.matchAll())
.build();
assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
.collectList().block())
.containsOnly(messageId);
}
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return Mono.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(1);
//合并历史数据和实时数据
return fromHistory(deviceId, history);
});
}
@Override
protected Optional<Flux> createInstanceEmittingAMultipleValuesAndFailure(String v1, String v2, RuntimeException e) {
Flux<String> stream = Flux.create(emitter -> {
emitter.next(v1);
emitter.next(v2);
emitter.error(e);
});
return Optional.of(stream);
}
private Flux<ApiCallRc> deleteRemainingInTransaction(ResourceName rscName)
{
ResourceDefinition rscDfn = ctrlApiDataLoader.loadRscDfn(rscName, false);
Flux<ApiCallRc> flux;
if (rscDfn == null)
{
flux = Flux.empty();
}
else
{
for (Resource rsc : getRscStreamPrivileged(rscDfn).collect(Collectors.toList()))
{
if (isDisklessPrivileged(rsc))
{
deletePrivileged(rsc);
}
else
{
markDeletedPrivileged(rsc);
}
}
ctrlTransactionHelper.commit();
Flux<ApiCallRc> nextStep = deleteData(rscName);
flux = ctrlSatelliteUpdateCaller.updateSatellites(rscDfn, nextStep)
.transform(updateResponses -> CtrlResponseUtils.combineResponses(
updateResponses,
rscName,
"Resource {1} on {0} deleted"
))
.concatWith(nextStep)
.onErrorResume(CtrlResponseUtils.DelayedApiRcException.class, ignored -> Flux.empty());
}
return flux;
}
@GetMapping("/bang")
public Flux<?> bang() {
return Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
if (value.equals("bar")) {
throw new RuntimeException("Bar");
}
return value;
});
}
public static void main(String[] args) {
MeterRegistry registry = SampleConfig.myMonitoringSystem();
LongTaskTimer timer = registry.more().longTaskTimer("longTaskTimer");
RandomEngine r = new MersenneTwister64(0);
Normal incomingRequests = new Normal(0, 1, r);
Normal duration = new Normal(30, 50, r);
AtomicInteger latencyForThisSecond = new AtomicInteger(duration.nextInt());
Flux.interval(Duration.ofSeconds(1))
.doOnEach(d -> latencyForThisSecond.set(duration.nextInt()))
.subscribe();
final Map<LongTaskTimer.Sample, CountDownLatch> tasks = new ConcurrentHashMap<>();
// the potential for an "incoming request" every 10 ms
Flux.interval(Duration.ofSeconds(1))
.doOnEach(d -> {
if (incomingRequests.nextDouble() + 0.4 > 0 && tasks.isEmpty()) {
int taskDur;
while ((taskDur = duration.nextInt()) < 0);
synchronized (tasks) {
tasks.put(timer.start(), new CountDownLatch(taskDur));
}
}
synchronized (tasks) {
for (Map.Entry<LongTaskTimer.Sample, CountDownLatch> e : tasks.entrySet()) {
e.getValue().countDown();
if (e.getValue().getCount() == 0) {
e.getKey().stop();
tasks.remove(e.getKey());
}
}
}
})
.blockLast();
}
@PUT
@Path("{vlmNr}")
public void modifyVolume(
@Context Request request,
@Suspended final AsyncResponse asyncResponse,
@PathParam("nodeName") String nodeName,
@PathParam("rscName") String rscName,
@PathParam("vlmNr") Integer vlmNr,
String jsonData
)
throws IOException
{
JsonGenTypes.VolumeModify modifyData = objectMapper
.readValue(jsonData, JsonGenTypes.VolumeModify.class);
Flux<ApiCallRc> flux = ctrlApiCallHandler.modifyVlm(
null,
nodeName,
rscName,
vlmNr,
modifyData.override_props,
new HashSet<>(modifyData.delete_props),
new HashSet<>(modifyData.delete_namespaces)
)
.subscriberContext(requestHelper.createContext(ApiConsts.API_MOD_VLM, request));
requestHelper.doFlux(asyncResponse, ApiCallRcRestUtils.mapToMonoResponse(flux, Response.Status.OK));
}
@Test
public void joinErrors() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
Mono<DataBuffer> result = DataBufferUtils.join(flux);
StepVerifier.create(result)
.expectError(RuntimeException.class)
.verify();
}
@Test
public void operatorChainWithDebugMode() {
Hooks.onOperatorDebug();
List<String> downstream = new ArrayList<>();
List<String> upstream = new ArrayList<>();
Mono<?> m=
Flux.from(s -> {
Scannable thisSubscriber = Scannable.from(s);
assertThat(thisSubscriber.isScanAvailable()).as("thisSubscriber.isScanAvailable").isTrue();
thisSubscriber.steps().forEach(downstream::add);
})
.map(a -> a)
.delayElements(Duration.ofMillis(10))
.filter(a -> true)
.reduce((a, b) -> b);
m.subscribe();
Scannable thisOperator = Scannable.from(m);
assertThat(thisOperator.isScanAvailable()).as("thisOperator.isScanAvailable").isTrue();
thisOperator.steps().forEach(upstream::add);
assertThat(downstream).containsExactly(
"Flux.from ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:537)",
"Flux.map ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:542)",
"Flux.delayElements ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:543)",
"Flux.filter ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:544)",
"Flux.reduce ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:545)",
"lambda");
assertThat(upstream).containsExactly(
"Flux.from ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:537)",
"Flux.map ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:542)",
"Flux.delayElements ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:543)",
"Flux.filter ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:544)",
"Flux.reduce ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:545)");
}
@Test
public void expectNextCountZero() {
Flux<String> flux = Flux.empty();
StepVerifier.create(flux)
.expectNextCount(0)
.expectComplete()
.verify();
}
@Override
public Flux<CredentialSummary> findByPath(final String path) {
Assert.notNull(path, "credential path must not be null");
return this.credHubOperations.doWithWebClient((webClient) -> webClient.get().uri(PATH_URL_QUERY, path)
.retrieve().onStatus(HttpStatus::isError, ExceptionUtils::buildError)
.bodyToMono(CredentialSummaryData.class)
.flatMapMany((data) -> Flux.fromIterable(data.getCredentials())));
}
@Override
public Flux<Envelope> getLogStream(String serviceInstanceId) {
return this.applicationIdsProvider
.getApplicationIds(serviceInstanceId)
.doOnNext(id -> LOG.debug("Starting log streaming for app with ID {}", id))
.flatMap(this::createApplicationStreamer);
}
@Override
public Mono<ReactiveHttpResponse> executeRequest(ReactiveHttpRequest request) {
Mono<Object> bodyMono;
if (request.body() instanceof Mono) {
bodyMono = ((Mono<Object>) request.body());
} else if (request.body() instanceof Flux) {
bodyMono = ((Flux) request.body()).collectList();
} else {
bodyMono = Mono.just(request.body());
}
bodyMono = bodyMono.switchIfEmpty(Mono.just(new byte[0]));
return bodyMono.<ReactiveHttpResponse>flatMap(body -> {
MultiValueMap<String, String> headers = new LinkedMultiValueMap<>(request.headers());
if (acceptGzip) {
headers.add("Accept-Encoding", "gzip");
}
ResponseEntity response =
restTemplate.exchange(request.uri().toString(), HttpMethod.valueOf(request.method()),
new HttpEntity<>(body, headers), responseType());
return Mono.just(new FakeReactiveHttpResponse(response, returnPublisherType));
})
.onErrorMap(ex -> ex instanceof ResourceAccessException
&& ex.getCause() instanceof SocketTimeoutException,
ReadTimeoutException::new)
.onErrorResume(HttpStatusCodeException.class,
ex -> Mono.just(new ErrorReactiveHttpResponse(ex)));
}
@Test
public void testHotPublisher(){
UnicastProcessor<String> hotSource = UnicastProcessor.create();
Flux<Category> hotPublisher = hotSource.publish()
.autoConnect().map((String t) -> Category.builder().name(t).build());
hotPublisher.subscribe(category -> System.out.println("Subscriber 1: "+ category.getName()));
hotSource.onNext("sports");
hotSource.onNext("cars");
hotPublisher.subscribe(category -> System.out.println("Subscriber 2: "+category.getName()));
hotSource.onNext("games");
hotSource.onNext("electronics");
hotSource.onComplete();
}
@Test
public void zipTwoStreams() throws Exception {
//given
final Flux<Integer> nums = Flux.just(1, 2, 3);
final Flux<String> strs = Flux.just("a", "b");
//when
final Flux<Tuple2<Integer, String>> pairs = nums.zipWith(strs);
final Flux<Tuple2<Integer, String>> pairs2 = Flux.zip(nums, strs); //same thing
//then
pairs.subscribe(p -> log.info("Pair: {}", p));
}
public static void main(String[] args) {
MeterRegistry registry = SampleConfig.myMonitoringSystem();
AtomicLong n = new AtomicLong();
registry.gauge("gauge", Tags.of("k", "v"), n);
registry.gauge("gauge", Tags.of("k", "v1"), n, n2 -> n2.get() - 1);
RandomEngine r = new MersenneTwister64(0);
Normal dist = new Normal(0, 10, r);
Flux.interval(Duration.ofSeconds(5))
.doOnEach(d -> n.set(Math.abs(dist.nextInt())))
.blockLast();
}
@Test // SPR-14952
public void writeAndFlushWithFluxOfDefaultDataBuffer() throws Exception {
TestServerHttpResponse response = new TestServerHttpResponse();
Flux<Flux<DefaultDataBuffer>> flux = Flux.just(Flux.just(wrap("foo")));
response.writeAndFlushWith(flux).block();
assertTrue(response.statusCodeWritten);
assertTrue(response.headersWritten);
assertTrue(response.cookiesWritten);
assertEquals(1, response.body.size());
assertEquals("foo", new String(response.body.get(0).asByteBuffer().array(), StandardCharsets.UTF_8));
}
public Mono<List<BackingApplication>> addCredentials(List<BackingApplication> backingApplications,
String serviceInstanceGuid) {
return Flux.fromIterable(backingApplications)
.flatMap(backingApplication -> {
List<CredentialProviderSpec> specs = getSpecsForApplication(backingApplication);
return Flux.fromIterable(specs)
.flatMap(spec -> {
CredentialProvider provider = locator.getByName(spec.getName(), spec.getArgs());
return provider.addCredentials(backingApplication, serviceInstanceGuid);
})
.then(Mono.just(backingApplication));
})
.collectList();
}
@Override
public Flux<Member> getGuildMembers(Snowflake guildId) {
Function<Map<String, Object>, Flux<MemberData>> doRequest = params ->
rest.getGuildService().getGuildMembers(guildId.asLong(), params);
return PaginationUtil.paginateAfter(doRequest, data -> Snowflake.asLong(data.user().id()), 0, 100)
.map(data -> new Member(gateway, data, guildId.asLong()));
}
@Test
public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {
Mono<Long> saveAndCount = repository.count()
.doOnNext(System.out::println)
.thenMany(repository.saveAll(Flux.just(new Employee(325, "Kim Jones", "Florida", "[email protected]", 42),
new Employee(654, "Tom Moody", "New Hampshire", "[email protected]", 44))))
.last()
.flatMap(v -> repository.count())
.doOnNext(System.out::println);
StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
}