org.springframework.http.ReactiveHttpInputMessage#reactor.test.publisher.TestPublisher源码实例Demo

下面列出了org.springframework.http.ReactiveHttpInputMessage#reactor.test.publisher.TestPublisher 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void toMonoVoidAsClientShouldConsumeAndCancel() {
	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	TestPublisher<DataBuffer> body = TestPublisher.create();

	BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
	MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
	response.setBody(body.flux());

	StepVerifier.create(extractor.extract(response, this.context))
			.then(() -> {
				body.assertWasSubscribed();
				body.emit(dataBuffer);
			})
			.verifyComplete();

	body.assertCancelled();
}
 
源代码2 项目: Moss   文件: AbstractEventHandlerTest.java
@Test
public void should_resubscribe_after_error() {
    TestPublisher<InstanceEvent> testPublisher = TestPublisher.create();

    TestEventHandler eventHandler = new TestEventHandler(testPublisher.flux());
    eventHandler.start();

    StepVerifier.create(eventHandler.getFlux())
            .expectSubscription()
            .then(() -> testPublisher.next(event))
            .expectNext(event)
            .then(() -> testPublisher.next(errorEvent))
            .expectNoEvent(Duration.ofMillis(100L))
            .then(() -> testPublisher.next(event))
            .expectNext(event)
            .thenCancel()
            .verify(Duration.ofSeconds(5));

}
 
源代码3 项目: Moss   文件: NotificationTriggerTest.java
@Test
public void should_notify_on_event() throws InterruptedException {
    //given
    Notifier notifier = mock(Notifier.class);
    TestPublisher<InstanceEvent> events = TestPublisher.create();
    NotificationTrigger trigger = new NotificationTrigger(notifier, events);
    trigger.start();
    Thread.sleep(500L); //wait for subscription

    //when registered event is emitted
    InstanceStatusChangedEvent event = new InstanceStatusChangedEvent(instance.getId(), instance.getVersion(),
        StatusInfo.ofDown());
    events.next(event);
    //then should notify
    verify(notifier, times(1)).notify(event);

    //when registered event is emitted but the trigger has been stopped
    trigger.stop();
    clearInvocations(notifier);
    events.next(new InstanceRegisteredEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
    //then should not notify
    verify(notifier, never()).notify(event);
}
 
源代码4 项目: reactor-core   文件: FluxBufferWhenTest.java
@Test
	public void openCloseEmptyBackpressure() {
		TestPublisher<Integer> source = TestPublisher.create();
		TestPublisher<Integer> open = TestPublisher.create();
		TestPublisher<Integer> close = TestPublisher.create();

		StepVerifier.create(source.flux()
				.bufferWhen(open, o -> close), 0)
		            .then(() -> {
		            	source.complete();
		            	open.assertNoSubscribers();
		            	close.assertNoSubscribers();
		            })
		            .verifyComplete();
//		ts.assertResult();
	}
 
@Test
public void shouldWriteFromPublisher() {
    Buffer firstChunk = Buffer.buffer("chunk 1");
    Buffer secondChunk = Buffer.buffer("chunk 2");

    TestPublisher<DataBuffer> source = TestPublisher.create();
    Mono<Void> result = request.writeWith(source);

    StepVerifier.create(result)
        .expectSubscription()
        .then(() -> source.assertMinRequested(1))
        .then(() -> source.next(bufferConverter.toDataBuffer(firstChunk)))
        .then(() -> source.assertMinRequested(1))
        .then(() -> source.next(bufferConverter.toDataBuffer(secondChunk)))
        .then(() -> source.assertMinRequested(1))
        .then(source::complete)
        .verifyComplete();

    verify(mockHttpClientRequest).write(firstChunk);
    verify(mockHttpClientRequest).write(secondChunk);
    verify(mockHttpClientRequest).end();
}
 
@Test
public void shouldWriteFromPublisher() {
    Buffer firstChunk = Buffer.buffer("chunk 1");
    Buffer secondChunk = Buffer.buffer("chunk 2");

    TestPublisher<DataBuffer> source = TestPublisher.create();
    Mono<Void> result = response.writeWithInternal(source);

    StepVerifier.create(result)
        .expectSubscription()
        .then(() -> source.assertMinRequested(1))
        .then(() -> source.next(bufferConverter.toDataBuffer(firstChunk)))
        .then(() -> source.assertMinRequested(1))
        .then(() -> source.next(bufferConverter.toDataBuffer(secondChunk)))
        .then(() -> source.assertMinRequested(1))
        .then(source::complete)
        .verifyComplete();

    verify(mockHttpServerResponse).write(firstChunk);
    verify(mockHttpServerResponse).write(secondChunk);
}
 
@Test
public void shouldWriteFromPublisherAndFlush() {
    Buffer firstChunk = Buffer.buffer("chunk 1");
    Buffer secondChunk = Buffer.buffer("chunk 2");

    TestPublisher<DataBuffer> source = TestPublisher.create();
    Mono<Void> result = response.writeAndFlushWithInternal(Flux.just(source));

    StepVerifier.create(result)
        .expectSubscription()
        .then(() -> source.assertMinRequested(1))
        .then(() -> source.next(bufferConverter.toDataBuffer(firstChunk)))
        .then(() -> source.assertMinRequested(1))
        .then(() -> source.next(bufferConverter.toDataBuffer(secondChunk)))
        .then(() -> source.assertMinRequested(1))
        .then(source::complete)
        .verifyComplete();

    verify(mockHttpServerResponse).write(firstChunk);
    verify(mockHttpServerResponse).write(secondChunk);
}
 
源代码8 项目: rsocket-java   文件: RSocketTest.java
void errorFromRequesterPublisher(
    TestPublisher<Payload> requesterPublisher,
    AssertSubscriber<Payload> requesterSubscriber,
    TestPublisher<Payload> responderPublisher,
    AssertSubscriber<Payload> responderSubscriber) {
  // ensures that after sending cancel the whole requestChannel is terminated
  requesterPublisher.error(EXCEPTION);
  // error should be propagated
  responderSubscriber
      .assertTerminated()
      .assertError(CustomRSocketException.class)
      .assertErrorMessage("test");
  requesterSubscriber
      .assertTerminated()
      .assertError(CustomRSocketException.class)
      .assertErrorMessage("test");

  // ensures that cancellation is propagated to the actual upstream
  responderPublisher.assertWasCancelled();
  responderPublisher.assertNoSubscribers();
}
 
@Test
public void shouldSendBinaryMessage() {
    TestPublisher<WebSocketMessage> source = TestPublisher.create();
    Mono<Void> result = session.send(source);

    StepVerifier.create(result)
        .expectSubscription()
        .then(() -> source.assertMinRequested(1))
        .then(() -> source.next(getWebSocketMessage(WebSocketMessage.Type.BINARY, "test1")))
        .then(() -> source.assertMinRequested(1))
        .then(() -> source.next(getWebSocketMessage(WebSocketMessage.Type.BINARY, "test2")))
        .then(() -> source.assertMinRequested(1))
        .then(source::complete)
        .verifyComplete();

    verify(mockServerWebSocket).writeBinaryMessage(Buffer.buffer("test1"));
    verify(mockServerWebSocket).writeBinaryMessage(Buffer.buffer("test2"));
}
 
源代码10 项目: reactor-core   文件: MonoMetricsTest.java
@Test
public void malformedOnError() {
	AtomicReference<Throwable> errorDropped = new AtomicReference<>();
	Hooks.onErrorDropped(errorDropped::set);
	Exception dropError = new IllegalStateException("malformedOnError");
	TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(CLEANUP_ON_TERMINATE);
	Mono<Integer> source = testPublisher.mono().hide();

	new MonoMetrics<>(source, registry)
			.subscribe();

	testPublisher.complete()
	             .error(dropError);

	Counter malformedMeter = registry
			.find(METER_MALFORMED)
			.counter();

	assertThat(malformedMeter).isNotNull();
	assertThat(malformedMeter.count()).isEqualTo(1);
	assertThat(errorDropped).hasValue(dropError);
}
 
源代码11 项目: rsocket-java   文件: RSocketTest.java
@Test
public void requestChannelCase_ErrorFromRequesterShouldTerminatesStreamsOnBothSides() {
  TestPublisher<Payload> requesterPublisher = TestPublisher.create();
  AssertSubscriber<Payload> requesterSubscriber = new AssertSubscriber<>(0);

  AssertSubscriber<Payload> responderSubscriber = new AssertSubscriber<>(0);
  TestPublisher<Payload> responderPublisher = TestPublisher.create();

  initRequestChannelCase(
      requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber);

  nextFromResponderPublisher(responderPublisher, requesterSubscriber);

  nextFromRequesterPublisher(requesterPublisher, responderSubscriber);

  // ensures both sides are terminated
  errorFromRequesterPublisher(
      requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber);
}
 
源代码12 项目: reactor-core   文件: FluxWindowWhenTest.java
@Test
public void startDoneThenError() {
	TestPublisher<Integer> source = TestPublisher.create();
	TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
	final TestPublisher<Integer> end = TestPublisher.create();

	StepVerifier.create(source.flux()
	                          .windowWhen(start, v -> end)
	                          .flatMap(Flux.identityFunction())
	)
	            .then(() -> start.error(new IllegalStateException("boom"))
	                             .error(new IllegalStateException("boom2")))
	            .expectErrorMessage("boom")
	            .verifyThenAssertThat()
	            .hasDroppedErrorWithMessage("boom2");

	source.assertNoSubscribers();
	//start doesn't cleanup and as such still has a subscriber
	end.assertNoSubscribers();
}
 
源代码13 项目: reactor-core   文件: FluxFlatMapTest.java
@Test
public void normalInnerArrayMoreThanDefaultArraySize4() {
	TestPublisher<Integer> ts = TestPublisher.create();
	TestPublisher<Integer> ts2 = TestPublisher.create();
	StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
	                        .hide()
	                        .flatMap(f -> f < 5 ? ts : ts2), 0)
	            .then(() -> ts.next(1, 2))
	            .thenRequest(2)
	            .expectNext(1, 2)
	            .thenRequest(2)
	            .expectNext(1, 2)
	            .thenRequest(2)
	            .expectNext(1, 2)
	            .thenRequest(2)
	            .expectNext(1, 2)
	            .then(() -> ts2.complete())
	            .then(() -> ts.emit(3))
	            .thenRequest(4)
	            .expectNext(3, 3, 3, 3)
	            .verifyComplete();
}
 
源代码14 项目: reactor-core   文件: FluxBufferWhenTest.java
@Test
public void openCloseLimit() {
	TestPublisher<Integer> source = TestPublisher.create();
	TestPublisher<Integer> open = TestPublisher.create();
	TestPublisher<Integer> close = TestPublisher.create();

	StepVerifier.create(source.flux()
	                          .bufferWhen(open, o -> close)
	                          .limitRequest(1))
	            .then(() -> {
		            open.next(1);
		            close.complete();
	            })
	            .then(() -> {
		            source.assertNoSubscribers();
		            open.assertNoSubscribers();
		            close.assertNoSubscribers();
	            })
	            .expectNextMatches(List::isEmpty)
	            .verifyComplete();
}
 
源代码15 项目: reactor-core   文件: HooksTest.java
@Test
public void monoWrapPublisherDoesntCallAssemblyHook() {
	final Publisher<Integer> source = TestPublisher.create();
	Assertions.assertThat(source).isNotInstanceOf(Flux.class); //smoke test this is a Publisher

	//set the hook AFTER the original operators have been invoked (since they trigger assembly themselves)
	AtomicInteger wrappedCount = new AtomicInteger();
	Hooks.onEachOperator(p -> {
		wrappedCount.incrementAndGet();
		return p;
	});

	Hooks.convertToMonoBypassingHooks(source, true);
	Hooks.convertToMonoBypassingHooks(source, false);
	Mono.wrap(source, true);
	Mono.wrap(source, false);
	Assertions.assertThat(wrappedCount).hasValue(0);
}
 
源代码16 项目: rsocket-java   文件: RSocketRequesterTest.java
@Test
public void simpleOnDiscardRequestChannelTest2() {
  ByteBufAllocator allocator = rule.alloc();
  AssertSubscriber<Payload> assertSubscriber = AssertSubscriber.create(1);
  TestPublisher<Payload> testPublisher = TestPublisher.create();

  Flux<Payload> payloadFlux = rule.socket.requestChannel(testPublisher);

  payloadFlux.subscribe(assertSubscriber);

  testPublisher.next(ByteBufPayload.create("d", "m"));

  int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
  testPublisher.next(ByteBufPayload.create("d1", "m1"), ByteBufPayload.create("d2", "m2"));

  rule.connection.addToReceivedBuffer(
      ErrorFrameCodec.encode(
          allocator, streamId, new CustomRSocketException(0x00000404, "test")));

  Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release);

  rule.assertHasNoLeaks();
}
 
@Test
public void timeoutLimit() {
	TestPublisher<Integer> tp = TestPublisher.create();
	StepVerifier.withVirtualTime(() ->
			tp.flux().onBackpressureBuffer(Duration.ofSeconds(1), 1, this, VirtualTimeScheduler.get()), 0)
	            .expectSubscription()
	            .then(() -> tp.next(1))
	            .expectNoEvent(Duration.ofMillis(500))
	            .then(() -> tp.next(2))
	            .expectNoEvent(Duration.ofMillis(500))
	            .then(() -> tp.next(3))
	            .expectNoEvent(Duration.ofMillis(500))
	            .then(() -> tp.next(4))
	            .expectNoEvent(Duration.ofMillis(500))
	            .then(() -> tp.next(5))
	            .expectNoEvent(Duration.ofMillis(500))
	            .then(tp::complete)
	            .thenRequest(1)
	            .expectNext(5)
	            .verifyComplete();

	assertThat(evicted).containsExactly(1, 2, 3, 4);
}
 
源代码18 项目: reactor-core   文件: FluxWindowWhenTest.java
@Test
public void startError() {
	TestPublisher<Integer> source = TestPublisher.create();
	TestPublisher<Integer> start = TestPublisher.create();
	final TestPublisher<Integer> end = TestPublisher.create();

	StepVerifier.create(source.flux()
	                          .windowWhen(start, v -> end)
	                          .flatMap(Flux.identityFunction())
	)
	            .then(() -> start.error(new IllegalStateException("boom")))
	            .expectErrorMessage("boom")
	            .verify();

	source.assertNoSubscribers();
	start.assertNoSubscribers();
	end.assertNoSubscribers();
}
 
源代码19 项目: reactor-core   文件: MonoDelayElementTest.java
@Test
public void errorAfterNextIsNeverTriggered() {
	TestPublisher<String> source = TestPublisher.create();
	AtomicReference<Throwable> errorDropped = new AtomicReference<>();
	Hooks.onErrorDropped(errorDropped::set);

	StepVerifier.withVirtualTime(() ->
			new MonoDelayElement<>(source.mono(), 2, TimeUnit.SECONDS, defaultSchedulerForDelay()))
	            .expectSubscription()
	            .then(() -> source.next("foo").error(new IllegalStateException("boom")))
	            .expectNoEvent(Duration.ofSeconds(2))
	            .expectNext("foo")
	            .verifyComplete();

	assertThat(errorDropped.get()).isNull();
}
 
源代码20 项目: reactor-core   文件: MonoDelayElementTest.java
@Test
public void cancelUpstreamOnceWhenRejected() {
	VirtualTimeScheduler vts = VirtualTimeScheduler.create();
	vts.dispose();

	TestPublisher<Object> testPublisher = TestPublisher.createCold();
	testPublisher.emit("Hello");

	StepVerifier.create(new MonoDelayElement<>(testPublisher.mono(), 2, TimeUnit.SECONDS, vts))
	            .verifyErrorSatisfies(e -> {
		            assertThat(e)
				            .isInstanceOf(RejectedExecutionException.class)
				            .hasMessage("Scheduler unavailable");
	            });

	testPublisher.assertWasRequested();
	testPublisher.assertCancelled(1);
}
 
@Test
public void shouldReturnFluxOfServices() {
	TestPublisher<String> discoveryClient1Publisher = TestPublisher.createCold();
	discoveryClient1Publisher.emit("serviceAFromClient1");
	discoveryClient1Publisher.emit("serviceBFromClient1");
	discoveryClient1Publisher.complete();

	TestPublisher<String> discoveryClient2Publisher = TestPublisher.createCold();
	discoveryClient2Publisher.emit("serviceCFromClient2");
	discoveryClient2Publisher.complete();

	when(discoveryClient1.getServices()).thenReturn(discoveryClient1Publisher.flux());
	when(discoveryClient2.getServices()).thenReturn(discoveryClient2Publisher.flux());

	ReactiveCompositeDiscoveryClient client = new ReactiveCompositeDiscoveryClient(
			asList(discoveryClient1, discoveryClient2));

	assertThat(client.description()).isEqualTo("Composite Reactive Discovery Client");

	Flux<String> services = client.getServices();

	StepVerifier.create(services).expectNext("serviceAFromClient1")
			.expectNext("serviceBFromClient1").expectNext("serviceCFromClient2")
			.expectComplete().verify();
}
 
源代码22 项目: reactor-core   文件: FluxSwitchOnFirstTest.java
@Test
public void shouldBeRequestedOneFromUpstreamTwiceInCaseOfConditional() throws InterruptedException {
    TestPublisher<Long> publisher = TestPublisher.createCold();
    ArrayList<Long> capture = new ArrayList<>();
    ArrayList<Long> requested = new ArrayList<>();
    CountDownLatch latch = new CountDownLatch(1);
    Flux<Long> switchTransformed = publisher.flux()
                                            .doOnRequest(requested::add)
                                            .switchOnFirst((first, innerFlux) -> innerFlux)
                                            .filter(e -> false);

    publisher.next(1L);
    publisher.complete();

    switchTransformed.subscribeWith(new LambdaSubscriber<>(capture::add, __ -> {}, latch::countDown, s -> s.request(1)));

    latch.await(5, TimeUnit.SECONDS);

    Assertions.assertThat(capture).isEmpty();
    Assertions.assertThat(requested).containsExactly(1L, 1L);
}
 
源代码23 项目: reactor-core   文件: FluxWindowWhenTest.java
@Test
public void startDoneThenComplete() {
	TestPublisher<Integer> source = TestPublisher.create();
	TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
	final TestPublisher<Integer> end = TestPublisher.create();

	StepVerifier.create(source.flux()
	                          .windowWhen(start, v -> end)
	                          .flatMap(Flux.identityFunction())
	)
	            .then(() -> start.error(new IllegalStateException("boom"))
	                             .complete())
	            .expectErrorMessage("boom")
	            .verifyThenAssertThat()
	            .hasNotDroppedErrors();

	source.assertNoSubscribers();
	//start doesn't cleanup and as such still has a subscriber
	end.assertNoSubscribers();
}
 
源代码24 项目: reactor-core   文件: MonoExpandTest.java
@Test
public void depthCompleteCancelRace() {
	for (int i = 0; i < 1000; i++) {

		final TestPublisher<Integer> pp = TestPublisher.create();

		final AssertSubscriber<Integer> ts = AssertSubscriber.create(1);
		Mono.just(0)
		    .expandDeep(it -> pp)
		    .subscribe(ts);

		Runnable r1 = pp::complete;
		Runnable r2 = ts::cancel;

		RaceTestUtils.race(r1, r2, Schedulers.single());
	}
}
 
源代码25 项目: rsocket-java   文件: RSocketTest.java
void nextFromResponderPublisher(
    TestPublisher<Payload> responderPublisher, AssertSubscriber<Payload> requesterSubscriber) {
  // ensures that downstream is not terminated so the requestChannel state is half-closed
  responderPublisher.assertSubscribers(1);
  requesterSubscriber.assertNotTerminated();

  // ensures responderPublisher can send messages and outerSubscriber can receive them
  requesterSubscriber.request(5);
  responderPublisher.next(
      DefaultPayload.create("rd1", "rm1"),
      DefaultPayload.create("rd2"),
      DefaultPayload.create("rd3", "rm3"),
      DefaultPayload.create("rd4"),
      DefaultPayload.create("rd5", "rm5"));

  List<Payload> outerPayloads = requesterSubscriber.awaitAndAssertNextValueCount(5).values();
  Assertions.assertThat(outerPayloads.stream().map(Payload::getDataUtf8))
      .containsExactly("rd1", "rd2", "rd3", "rd4", "rd5");
  Assertions.assertThat(outerPayloads.stream().map(Payload::hasMetadata))
      .containsExactly(true, false, true, false, true);
  Assertions.assertThat(outerPayloads.stream().map(Payload::getMetadataUtf8))
      .containsExactly("rm1", "", "rm3", "", "rm5");
}
 
@Test
public void onBackpressureBufferMaxCallbackSourceEmitsAfterComplete() {
	TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(TestPublisher.Violation.DEFER_CANCELLATION);
	CopyOnWriteArrayList<Integer> overflown = new CopyOnWriteArrayList<>();
	AtomicInteger producedCounter = new AtomicInteger();

	StepVerifier.create(testPublisher.flux()
	                                 .doOnNext(i -> producedCounter.incrementAndGet())
	                                 .onBackpressureBuffer(3, overflown::add),
			StepVerifierOptions.create().initialRequest(0).checkUnderRequesting(false))
	            .thenRequest(5)
				.then(() -> testPublisher.next(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
	            .expectNext(1, 2, 3, 4, 5)
	            .thenAwait() //at this point the buffer is overrun since the range request was unbounded
	            .thenRequest(100) //requesting more empties the buffer before an overflow error is propagated
	            .expectNext(6, 7, 8)
	            .expectErrorMatches(Exceptions::isOverflow)
	            .verifyThenAssertThat()
	            .hasDroppedExactly(10, 11, 12, 13, 14, 15);

	//the rest, asserted above, is dropped because the source was cancelled
	assertThat(overflown).as("passed to overflow handler").containsExactly(9);
	assertThat(producedCounter).as("bad source produced").hasValue(15);
}
 
源代码27 项目: reactor-core   文件: FluxMetricsTest.java
@Test
public void malformedOnError() {
	AtomicReference<Throwable> errorDropped = new AtomicReference<>();
	Hooks.onErrorDropped(errorDropped::set);
	Exception dropError = new IllegalStateException("malformedOnError");
	TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(CLEANUP_ON_TERMINATE);
	Flux<Integer> source = testPublisher.flux().hide();

	new FluxMetrics<>(source, registry)
			.subscribe();

	testPublisher.next(1)
	             .complete()
	             .error(dropError);

	Counter malformedMeter = registry
			.find(METER_MALFORMED)
			.counter();

	assertThat(malformedMeter).isNotNull();
	assertThat(malformedMeter.count()).isEqualTo(1);
	assertThat(errorDropped).hasValue(dropError);
}
 
源代码28 项目: reactor-core   文件: FluxWindowWhenTest.java
@Test
public void startDoneThenNext() {
	TestPublisher<Integer> source = TestPublisher.create();
	TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
	final TestPublisher<Integer> end = TestPublisher.create();

	StepVerifier.create(source.flux()
	                          .windowWhen(start, v -> end)
	                          .flatMap(Flux.identityFunction())
	)
	            .then(() -> start.error(new IllegalStateException("boom"))
	                             .next(1))
	            .expectErrorMessage("boom")
	            .verifyThenAssertThat()
	            .hasNotDroppedErrors()
	            .hasNotDroppedElements();

	source.assertNoSubscribers();
	//start doesn't cleanup and as such still has a subscriber
	end.assertNoSubscribers();
}
 
源代码29 项目: reactor-core   文件: FluxSwitchOnFirstTest.java
@Test
public void shouldNeverSendIncorrectRequestSizeToUpstream() throws InterruptedException {
    TestPublisher<Long> publisher = TestPublisher.createCold();
    AtomicLong capture = new AtomicLong();
    ArrayList<Long> requested = new ArrayList<>();
    CountDownLatch latch = new CountDownLatch(1);
    Flux<Long> switchTransformed = publisher.flux()
                                            .doOnRequest(requested::add)
                                            .switchOnFirst((first, innerFlux) -> innerFlux);

    publisher.next(1L);
    publisher.complete();

    switchTransformed.subscribeWith(new LambdaSubscriber<>(capture::set, __ -> {}, latch::countDown, s -> s.request(1)));

    latch.await(5, TimeUnit.SECONDS);

    Assertions.assertThat(capture.get()).isEqualTo(1L);
    Assertions.assertThat(requested).containsExactly(1L);
}
 
源代码30 项目: reactor-core   文件: ParallelFluxTest.java
@Test
public void parallelSubscribeAndDispose() throws InterruptedException {
	AtomicInteger nextCount = new AtomicInteger();
	CountDownLatch cancelLatch = new CountDownLatch(1);
	TestPublisher<Integer> source = TestPublisher.create();

	Disposable d = source
			.flux()
			.parallel(3)
			.doOnCancel(cancelLatch::countDown)
			.subscribe(i -> nextCount.incrementAndGet());

	source.next(1, 2, 3);
	d.dispose();

	source.emit(4, 5, 6);

	boolean finished = cancelLatch.await(300, TimeUnit.MILLISECONDS);

	assertThat(finished).as("cancelled latch").isTrue();
	assertThat(d.isDisposed()).as("disposed").isTrue();
	assertThat(nextCount.get()).as("received count").isEqualTo(3);
}