org.junit.jupiter.api.Timeout#reactor.core.publisher.Flux源码实例Demo

下面列出了org.junit.jupiter.api.Timeout#reactor.core.publisher.Flux 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: java-technology-stack   文件: DataBufferUtils.java
/**
 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
 * Does <strong>not</strong> close the channel when the flux is terminated, and does
 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
 * {@link #releaseConsumer()}.
 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
 * @param source the stream of data buffers to be written
 * @param channel the channel to write to
 * @param position the file position at which the write is to begin; must be non-negative
 * @return a flux containing the same buffers as in {@code source}, that starts the writing
 * process when subscribed to, and that publishes any writing errors and the completion signal
 */
public static Flux<DataBuffer> write(
		Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {

	Assert.notNull(source, "'source' must not be null");
	Assert.notNull(channel, "'channel' must not be null");
	Assert.isTrue(position >= 0, "'position' must be >= 0");

	Flux<DataBuffer> flux = Flux.from(source);
	return Flux.create(sink -> {
		AsynchronousFileChannelWriteCompletionHandler completionHandler =
				new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
		sink.onDispose(completionHandler);
		flux.subscribe(completionHandler);
	});
}
 
源代码2 项目: redisson   文件: RedissonReactiveStringCommands.java
@Override

public Flux<NumericResponse<BitCountCommand, Long>> bitCount(Publisher<BitCountCommand> commands) {
    return execute(commands, command -> {

        Assert.notNull(command.getKey(), "Key must not be null!");

        Range<Long> range = command.getRange();
        if (range == null) {
            range = Range.unbounded();
        }
        
        byte[] keyBuf = toByteArray(command.getKey());
        Mono<Long> m;
        if (range == Range.<Long>unbounded()) {
            m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.BITCOUNT, keyBuf); 
        } else {
            m = write(keyBuf, StringCodec.INSTANCE, RedisCommands.BITCOUNT, 
                    keyBuf, range.getLowerBound().getValue().orElse(0L), 
                    range.getUpperBound().getValue().get());
        }
        return m.map(v -> new NumericResponse<>(command, v));
    });
}
 
源代码3 项目: james-project   文件: ReactorUtilsTest.java
@Test
void contextShouldCombineMDCs() {
    String value1 = "value1";
    String value2 = "value2";
    String key1 = "key1";
    String key2 = "key2";

    Flux.just(1)
        .doOnEach(ReactorUtils.log(() -> {
            assertThat(MDC.get(key1)).isEqualTo(value1);
            assertThat(MDC.get(key2)).isEqualTo(value2);
        }))
        .subscriberContext(ReactorUtils.context("test1", MDCBuilder.of(key1, value1)))
        .subscriberContext(ReactorUtils.context("test2", MDCBuilder.of(key2, value2)))
        .blockLast();
}
 
源代码4 项目: redisson   文件: RedissonReactiveListCommands.java
@Override
public Flux<ByteBufferResponse<BRPopLPushCommand>> bRPopLPush(Publisher<BRPopLPushCommand> commands) {
    return execute(commands, command -> {

        Assert.notNull(command.getKey(), "Key must not be null!");
        Assert.notNull(command.getDestination(), "Destination key must not be null!");
        Assert.notNull(command.getTimeout(), "Timeout must not be null!");
        
        byte[] keyBuf = toByteArray(command.getKey());
        byte[] destinationBuf = toByteArray(command.getDestination());
        
        Mono<byte[]> m = write(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.BRPOPLPUSH, 
                                keyBuf, destinationBuf, command.getTimeout().getSeconds());
        return m.map(v -> new ByteBufferResponse<>(command, ByteBuffer.wrap(v)));
    });
}
 
源代码5 项目: feign-reactive   文件: SmokeTest.java
@Test
public void testSimpleGet_success() throws JsonProcessingException {

  wireMockRule.stubFor(get(urlEqualTo("/icecream/flavors"))
      .willReturn(aResponse().withStatus(200)
          .withHeader("Content-Type", "application/json")
          .withBody(TestUtils.MAPPER.writeValueAsString(Flavor.values()))));

  wireMockRule.stubFor(get(urlEqualTo("/icecream/mixins"))
      .willReturn(aResponse().withStatus(200)
          .withHeader("Content-Type", "application/json")
          .withBody(TestUtils.MAPPER.writeValueAsString(Mixin.values()))));

  Flux<Flavor> flavors = client.getAvailableFlavors();
  Flux<Mixin> mixins = client.getAvailableMixins();

  StepVerifier.create(flavors)
      .expectNextSequence(asList(Flavor.values()))
      .verifyComplete();
  StepVerifier.create(mixins)
      .expectNextSequence(asList(Mixin.values()))
      .verifyComplete();

}
 
private void mockAppResultsInAppList() {
	givenRequestListApplications(Flux.just(ApplicationSummary.builder()
					.diskQuota(0)
					.id("test-application-id-1")
					.instances(1)
					.memoryLimit(0)
					.name("test-application-1")
					.requestedState("RUNNING")
					.runningInstances(1)
					.build(),
			ApplicationSummary.builder()
					.diskQuota(0)
					.id("test-application-id-2")
					.instances(1)
					.memoryLimit(0)
					.name("test-application-2")
					.requestedState("RUNNING")
					.runningInstances(1)
					.build()));
}
 
源代码7 项目: reactor-core   文件: StepVerifierTests.java
@Test
public void expectNextCountErrorIsSuppressed() {
	assertThatExceptionOfType(AssertionError.class)
			.isThrownBy(() -> StepVerifier.create(Flux.just("foo")
			.flatMap(r -> { throw new ArrayIndexOutOfBoundsException();}))
                .expectNextCount(1)
                .verifyError())
			.satisfies(error -> {
				assertThat(error)
						.hasMessageStartingWith("expectation \"expectNextCount(1)\" failed")
						.hasMessageContaining("signal: onError(java.lang.ArrayIndexOutOfBoundsException)");
				assertThat(error.getSuppressed())
						.hasSize(1)
						.allMatch(spr -> spr instanceof ArrayIndexOutOfBoundsException);
			});
}
 
源代码8 项目: spring-cloud-rsocket   文件: PingPongApp.java
@SuppressWarnings("Duplicates")
RSocket accept(RSocket rSocket) {
	RSocket pong = new RSocketProxy(rSocket) {

		@Override
		public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
			return Flux.from(payloads).map(Payload::getDataUtf8).doOnNext(str -> {
				int received = pingsReceived.incrementAndGet();
				log.info("received " + str + "(" + received + ") in Pong");
			}).map(PingPongApp::reply).map(reply -> {
				ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
						reply);
				ByteBuf routingMetadata = getForwardingMetadata(strategies,
						"ping", 1L);
				return DefaultPayload.create(data, routingMetadata);
			});
		}
	};
	return pong;
}
 
源代码9 项目: reactor-core   文件: StepVerifierTests.java
@Test(timeout = 1000L)
public void expectCancelDoNotHang() {
	StepVerifier.create(Flux.just("foo", "bar"), 1)
	            .expectNext("foo")
	            .thenCancel()
	            .verify();
}
 
源代码10 项目: reactor-workshop   文件: R052_ParsingCsv.java
@Test
public void loadingTwice() throws Exception {
	//given
	final Flux<Domain> domains = Domains.all();

	//when
	final List<Domain> first = domains.collectList().block();
	final List<Domain> second = domains.collectList().block();

	//then
	assertThat(first).isEqualTo(second);
}
 
源代码11 项目: james-project   文件: MailboxManagerTest.java
@Test
void searchForMessageShouldReturnMessagesFromMyDelegatedMailboxes() throws Exception {
    assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.ACL));

    session = mailboxManager.createSystemSession(USER_1);
    MailboxSession sessionFromDelegater = mailboxManager.createSystemSession(USER_2);
    MailboxPath delegatedMailboxPath = MailboxPath.forUser(USER_2, "SHARED");
    MailboxId delegatedMailboxId = mailboxManager.createMailbox(delegatedMailboxPath, sessionFromDelegater).get();
    MessageManager delegatedMessageManager = mailboxManager.getMailbox(delegatedMailboxId, sessionFromDelegater);

    MessageId messageId = delegatedMessageManager
        .appendMessage(AppendCommand.from(message), sessionFromDelegater)
        .getId().getMessageId();

    mailboxManager.setRights(delegatedMailboxPath,
        MailboxACL.EMPTY.apply(MailboxACL.command()
            .forUser(USER_1)
            .rights(MailboxACL.Right.Read, MailboxACL.Right.Lookup)
            .asAddition()),
        sessionFromDelegater);

    MultimailboxesSearchQuery multiMailboxesQuery = MultimailboxesSearchQuery
        .from(SearchQuery.matchAll())
        .build();

    assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
        .collectList().block())
        .containsOnly(messageId);
}
 
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
    return Mono.justOrEmpty(parameter.getString("deviceId"))
        .flatMapMany(deviceId -> {
            int history = parameter.getInt("history").orElse(1);
            //合并历史数据和实时数据
            return fromHistory(deviceId, history);
        });
}
 
@Override
protected Optional<Flux> createInstanceEmittingAMultipleValuesAndFailure(String v1, String v2, RuntimeException e) {
    Flux<String> stream = Flux.create(emitter -> {
        emitter.next(v1);
        emitter.next(v2);
        emitter.error(e);
    });
    return Optional.of(stream);
}
 
private Flux<ApiCallRc> deleteRemainingInTransaction(ResourceName rscName)
{
    ResourceDefinition rscDfn = ctrlApiDataLoader.loadRscDfn(rscName, false);

    Flux<ApiCallRc> flux;

    if (rscDfn == null)
    {
        flux = Flux.empty();
    }
    else
    {
        for (Resource rsc : getRscStreamPrivileged(rscDfn).collect(Collectors.toList()))
        {
            if (isDisklessPrivileged(rsc))
            {
                deletePrivileged(rsc);
            }
            else
            {
                markDeletedPrivileged(rsc);
            }
        }
        ctrlTransactionHelper.commit();

        Flux<ApiCallRc> nextStep = deleteData(rscName);
        flux = ctrlSatelliteUpdateCaller.updateSatellites(rscDfn, nextStep)
            .transform(updateResponses -> CtrlResponseUtils.combineResponses(
                updateResponses,
                rscName,
                "Resource {1} on {0} deleted"
            ))
            .concatWith(nextStep)
            .onErrorResume(CtrlResponseUtils.DelayedApiRcException.class, ignored -> Flux.empty());
    }

    return flux;
}
 
@GetMapping("/bang")
public Flux<?> bang() {
	return Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
		if (value.equals("bar")) {
			throw new RuntimeException("Bar");
		}
		return value;
	});
}
 
源代码16 项目: micrometer   文件: LongTaskTimerSample.java
public static void main(String[] args) {
    MeterRegistry registry = SampleConfig.myMonitoringSystem();
    LongTaskTimer timer = registry.more().longTaskTimer("longTaskTimer");

    RandomEngine r = new MersenneTwister64(0);
    Normal incomingRequests = new Normal(0, 1, r);
    Normal duration = new Normal(30, 50, r);

    AtomicInteger latencyForThisSecond = new AtomicInteger(duration.nextInt());
    Flux.interval(Duration.ofSeconds(1))
            .doOnEach(d -> latencyForThisSecond.set(duration.nextInt()))
            .subscribe();

    final Map<LongTaskTimer.Sample, CountDownLatch> tasks = new ConcurrentHashMap<>();

    // the potential for an "incoming request" every 10 ms
    Flux.interval(Duration.ofSeconds(1))
            .doOnEach(d -> {
                if (incomingRequests.nextDouble() + 0.4 > 0 && tasks.isEmpty()) {
                    int taskDur;
                    while ((taskDur = duration.nextInt()) < 0);
                    synchronized (tasks) {
                        tasks.put(timer.start(), new CountDownLatch(taskDur));
                    }
                }

                synchronized (tasks) {
                    for (Map.Entry<LongTaskTimer.Sample, CountDownLatch> e : tasks.entrySet()) {
                        e.getValue().countDown();
                        if (e.getValue().getCount() == 0) {
                            e.getKey().stop();
                            tasks.remove(e.getKey());
                        }
                    }
                }
            })
            .blockLast();
}
 
源代码17 项目: linstor-server   文件: Volumes.java
@PUT
@Path("{vlmNr}")
public void modifyVolume(
    @Context Request request,
    @Suspended final AsyncResponse asyncResponse,
    @PathParam("nodeName") String nodeName,
    @PathParam("rscName") String rscName,
    @PathParam("vlmNr") Integer vlmNr,
    String jsonData
)
    throws IOException
{
    JsonGenTypes.VolumeModify modifyData = objectMapper
        .readValue(jsonData, JsonGenTypes.VolumeModify.class);

    Flux<ApiCallRc> flux = ctrlApiCallHandler.modifyVlm(
        null,
        nodeName,
        rscName,
        vlmNr,
        modifyData.override_props,
        new HashSet<>(modifyData.delete_props),
        new HashSet<>(modifyData.delete_namespaces)
    )
    .subscriberContext(requestHelper.createContext(ApiConsts.API_MOD_VLM, request));

    requestHelper.doFlux(asyncResponse, ApiCallRcRestUtils.mapToMonoResponse(flux, Response.Status.OK));
}
 
@Test
public void joinErrors() {
	DataBuffer foo = stringBuffer("foo");
	DataBuffer bar = stringBuffer("bar");
	Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
	Mono<DataBuffer> result = DataBufferUtils.join(flux);

	StepVerifier.create(result)
			.expectError(RuntimeException.class)
			.verify();
}
 
源代码19 项目: reactor-core   文件: ScannableTest.java
@Test
public void operatorChainWithDebugMode() {
	Hooks.onOperatorDebug();

	List<String> downstream = new ArrayList<>();
	List<String> upstream = new ArrayList<>();

	Mono<?> m=
			Flux.from(s -> {
				Scannable thisSubscriber = Scannable.from(s);
				assertThat(thisSubscriber.isScanAvailable()).as("thisSubscriber.isScanAvailable").isTrue();
				thisSubscriber.steps().forEach(downstream::add);
			})
			    .map(a -> a)
			    .delayElements(Duration.ofMillis(10))
			    .filter(a -> true)
			    .reduce((a, b) -> b);

	m.subscribe();

	Scannable thisOperator = Scannable.from(m);
	assertThat(thisOperator.isScanAvailable()).as("thisOperator.isScanAvailable").isTrue();

	thisOperator.steps().forEach(upstream::add);

	assertThat(downstream).containsExactly(
			"Flux.from ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:537)",
			"Flux.map ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:542)",
			"Flux.delayElements ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:543)",
			"Flux.filter ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:544)",
			"Flux.reduce ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:545)",
			"lambda");

	assertThat(upstream).containsExactly(
			"Flux.from ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:537)",
			"Flux.map ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:542)",
			"Flux.delayElements ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:543)",
			"Flux.filter ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:544)",
			"Flux.reduce ⇢ at reactor.core.ScannableTest.operatorChainWithDebugMode(ScannableTest.java:545)");
}
 
源代码20 项目: reactor-core   文件: StepVerifierTests.java
@Test
public void expectNextCountZero() {
	Flux<String> flux = Flux.empty();

	StepVerifier.create(flux)
	            .expectNextCount(0)
	            .expectComplete()
	            .verify();
}
 
@Override
public Flux<CredentialSummary> findByPath(final String path) {
	Assert.notNull(path, "credential path must not be null");

	return this.credHubOperations.doWithWebClient((webClient) -> webClient.get().uri(PATH_URL_QUERY, path)
			.retrieve().onStatus(HttpStatus::isError, ExceptionUtils::buildError)
			.bodyToMono(CredentialSummaryData.class)
			.flatMapMany((data) -> Flux.fromIterable(data.getCredentials())));
}
 
@Override
public Flux<Envelope> getLogStream(String serviceInstanceId) {
	return this.applicationIdsProvider
		.getApplicationIds(serviceInstanceId)
		.doOnNext(id -> LOG.debug("Starting log streaming for app with ID {}", id))
		.flatMap(this::createApplicationStreamer);
}
 
@Override
public Mono<ReactiveHttpResponse> executeRequest(ReactiveHttpRequest request) {

  Mono<Object> bodyMono;
  if (request.body() instanceof Mono) {
    bodyMono = ((Mono<Object>) request.body());
  } else if (request.body() instanceof Flux) {
    bodyMono = ((Flux) request.body()).collectList();
  } else {
    bodyMono = Mono.just(request.body());
  }
  bodyMono = bodyMono.switchIfEmpty(Mono.just(new byte[0]));

  return bodyMono.<ReactiveHttpResponse>flatMap(body -> {
    MultiValueMap<String, String> headers = new LinkedMultiValueMap<>(request.headers());
    if (acceptGzip) {
      headers.add("Accept-Encoding", "gzip");
    }

    ResponseEntity response =
            restTemplate.exchange(request.uri().toString(), HttpMethod.valueOf(request.method()),
                    new HttpEntity<>(body, headers), responseType());

    return Mono.just(new FakeReactiveHttpResponse(response, returnPublisherType));
  })
          .onErrorMap(ex -> ex instanceof ResourceAccessException
                              && ex.getCause() instanceof SocketTimeoutException,
                  ReadTimeoutException::new)
          .onErrorResume(HttpStatusCodeException.class,
                  ex -> Mono.just(new ErrorReactiveHttpResponse(ex)));
}
 
@Test
public void testHotPublisher(){
  UnicastProcessor<String> hotSource = UnicastProcessor.create();
  Flux<Category> hotPublisher = hotSource.publish()
      .autoConnect().map((String t) -> Category.builder().name(t).build());
  hotPublisher.subscribe(category -> System.out.println("Subscriber 1: "+ category.getName()));
  hotSource.onNext("sports");
  hotSource.onNext("cars");
  hotPublisher.subscribe(category -> System.out.println("Subscriber 2: "+category.getName()));
  hotSource.onNext("games");
  hotSource.onNext("electronics");
  hotSource.onComplete();
}
 
源代码25 项目: reactor-workshop   文件: R043_Zip.java
@Test
public void zipTwoStreams() throws Exception {
	//given
	final Flux<Integer> nums = Flux.just(1, 2, 3);
	final Flux<String> strs = Flux.just("a", "b");

	//when
	final Flux<Tuple2<Integer, String>> pairs = nums.zipWith(strs);
	final Flux<Tuple2<Integer, String>> pairs2 = Flux.zip(nums, strs);  //same thing

	//then
	pairs.subscribe(p -> log.info("Pair: {}", p));
}
 
源代码26 项目: micrometer   文件: GaugeSample.java
public static void main(String[] args) {
    MeterRegistry registry = SampleConfig.myMonitoringSystem();
    AtomicLong n = new AtomicLong();
    registry.gauge("gauge", Tags.of("k", "v"), n);
    registry.gauge("gauge", Tags.of("k", "v1"), n, n2 -> n2.get() - 1);

    RandomEngine r = new MersenneTwister64(0);
    Normal dist = new Normal(0, 10, r);

    Flux.interval(Duration.ofSeconds(5))
            .doOnEach(d -> n.set(Math.abs(dist.nextInt())))
            .blockLast();
}
 
@Test  // SPR-14952
public void writeAndFlushWithFluxOfDefaultDataBuffer() throws Exception {
	TestServerHttpResponse response = new TestServerHttpResponse();
	Flux<Flux<DefaultDataBuffer>> flux = Flux.just(Flux.just(wrap("foo")));
	response.writeAndFlushWith(flux).block();

	assertTrue(response.statusCodeWritten);
	assertTrue(response.headersWritten);
	assertTrue(response.cookiesWritten);

	assertEquals(1, response.body.size());
	assertEquals("foo", new String(response.body.get(0).asByteBuffer().array(), StandardCharsets.UTF_8));
}
 
public Mono<List<BackingApplication>> addCredentials(List<BackingApplication> backingApplications,
	String serviceInstanceGuid) {
	return Flux.fromIterable(backingApplications)
		.flatMap(backingApplication -> {
			List<CredentialProviderSpec> specs = getSpecsForApplication(backingApplication);

			return Flux.fromIterable(specs)
				.flatMap(spec -> {
					CredentialProvider provider = locator.getByName(spec.getName(), spec.getArgs());
					return provider.addCredentials(backingApplication, serviceInstanceGuid);
				})
				.then(Mono.just(backingApplication));
		})
		.collectList();
}
 
源代码29 项目: Discord4J   文件: RestEntityRetriever.java
@Override
public Flux<Member> getGuildMembers(Snowflake guildId) {
    Function<Map<String, Object>, Flux<MemberData>> doRequest = params ->
            rest.getGuildService().getGuildMembers(guildId.asLong(), params);

   return PaginationUtil.paginateAfter(doRequest, data -> Snowflake.asLong(data.user().id()), 0, 100)
                    .map(data -> new Member(gateway, data, guildId.asLong()));
}
 
@Test
public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {

	Mono<Long> saveAndCount = repository.count()
			.doOnNext(System.out::println)
			.thenMany(repository.saveAll(Flux.just(new Employee(325, "Kim Jones", "Florida", "[email protected]", 42),
					new Employee(654, "Tom Moody", "New Hampshire", "[email protected]", 44))))
			.last()
			.flatMap(v -> repository.count())
			.doOnNext(System.out::println);

	StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
}