下面列出了org.springframework.http.ReactiveHttpInputMessage#reactor.test.publisher.TestPublisher 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void toMonoVoidAsClientShouldConsumeAndCancel() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
TestPublisher<DataBuffer> body = TestPublisher.create();
BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.setBody(body.flux());
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.emit(dataBuffer);
})
.verifyComplete();
body.assertCancelled();
}
@Test
public void should_resubscribe_after_error() {
TestPublisher<InstanceEvent> testPublisher = TestPublisher.create();
TestEventHandler eventHandler = new TestEventHandler(testPublisher.flux());
eventHandler.start();
StepVerifier.create(eventHandler.getFlux())
.expectSubscription()
.then(() -> testPublisher.next(event))
.expectNext(event)
.then(() -> testPublisher.next(errorEvent))
.expectNoEvent(Duration.ofMillis(100L))
.then(() -> testPublisher.next(event))
.expectNext(event)
.thenCancel()
.verify(Duration.ofSeconds(5));
}
@Test
public void should_notify_on_event() throws InterruptedException {
//given
Notifier notifier = mock(Notifier.class);
TestPublisher<InstanceEvent> events = TestPublisher.create();
NotificationTrigger trigger = new NotificationTrigger(notifier, events);
trigger.start();
Thread.sleep(500L); //wait for subscription
//when registered event is emitted
InstanceStatusChangedEvent event = new InstanceStatusChangedEvent(instance.getId(), instance.getVersion(),
StatusInfo.ofDown());
events.next(event);
//then should notify
verify(notifier, times(1)).notify(event);
//when registered event is emitted but the trigger has been stopped
trigger.stop();
clearInvocations(notifier);
events.next(new InstanceRegisteredEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
//then should not notify
verify(notifier, never()).notify(event);
}
@Test
public void openCloseEmptyBackpressure() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> open = TestPublisher.create();
TestPublisher<Integer> close = TestPublisher.create();
StepVerifier.create(source.flux()
.bufferWhen(open, o -> close), 0)
.then(() -> {
source.complete();
open.assertNoSubscribers();
close.assertNoSubscribers();
})
.verifyComplete();
// ts.assertResult();
}
@Test
public void shouldWriteFromPublisher() {
Buffer firstChunk = Buffer.buffer("chunk 1");
Buffer secondChunk = Buffer.buffer("chunk 2");
TestPublisher<DataBuffer> source = TestPublisher.create();
Mono<Void> result = request.writeWith(source);
StepVerifier.create(result)
.expectSubscription()
.then(() -> source.assertMinRequested(1))
.then(() -> source.next(bufferConverter.toDataBuffer(firstChunk)))
.then(() -> source.assertMinRequested(1))
.then(() -> source.next(bufferConverter.toDataBuffer(secondChunk)))
.then(() -> source.assertMinRequested(1))
.then(source::complete)
.verifyComplete();
verify(mockHttpClientRequest).write(firstChunk);
verify(mockHttpClientRequest).write(secondChunk);
verify(mockHttpClientRequest).end();
}
@Test
public void shouldWriteFromPublisher() {
Buffer firstChunk = Buffer.buffer("chunk 1");
Buffer secondChunk = Buffer.buffer("chunk 2");
TestPublisher<DataBuffer> source = TestPublisher.create();
Mono<Void> result = response.writeWithInternal(source);
StepVerifier.create(result)
.expectSubscription()
.then(() -> source.assertMinRequested(1))
.then(() -> source.next(bufferConverter.toDataBuffer(firstChunk)))
.then(() -> source.assertMinRequested(1))
.then(() -> source.next(bufferConverter.toDataBuffer(secondChunk)))
.then(() -> source.assertMinRequested(1))
.then(source::complete)
.verifyComplete();
verify(mockHttpServerResponse).write(firstChunk);
verify(mockHttpServerResponse).write(secondChunk);
}
@Test
public void shouldWriteFromPublisherAndFlush() {
Buffer firstChunk = Buffer.buffer("chunk 1");
Buffer secondChunk = Buffer.buffer("chunk 2");
TestPublisher<DataBuffer> source = TestPublisher.create();
Mono<Void> result = response.writeAndFlushWithInternal(Flux.just(source));
StepVerifier.create(result)
.expectSubscription()
.then(() -> source.assertMinRequested(1))
.then(() -> source.next(bufferConverter.toDataBuffer(firstChunk)))
.then(() -> source.assertMinRequested(1))
.then(() -> source.next(bufferConverter.toDataBuffer(secondChunk)))
.then(() -> source.assertMinRequested(1))
.then(source::complete)
.verifyComplete();
verify(mockHttpServerResponse).write(firstChunk);
verify(mockHttpServerResponse).write(secondChunk);
}
void errorFromRequesterPublisher(
TestPublisher<Payload> requesterPublisher,
AssertSubscriber<Payload> requesterSubscriber,
TestPublisher<Payload> responderPublisher,
AssertSubscriber<Payload> responderSubscriber) {
// ensures that after sending cancel the whole requestChannel is terminated
requesterPublisher.error(EXCEPTION);
// error should be propagated
responderSubscriber
.assertTerminated()
.assertError(CustomRSocketException.class)
.assertErrorMessage("test");
requesterSubscriber
.assertTerminated()
.assertError(CustomRSocketException.class)
.assertErrorMessage("test");
// ensures that cancellation is propagated to the actual upstream
responderPublisher.assertWasCancelled();
responderPublisher.assertNoSubscribers();
}
@Test
public void shouldSendBinaryMessage() {
TestPublisher<WebSocketMessage> source = TestPublisher.create();
Mono<Void> result = session.send(source);
StepVerifier.create(result)
.expectSubscription()
.then(() -> source.assertMinRequested(1))
.then(() -> source.next(getWebSocketMessage(WebSocketMessage.Type.BINARY, "test1")))
.then(() -> source.assertMinRequested(1))
.then(() -> source.next(getWebSocketMessage(WebSocketMessage.Type.BINARY, "test2")))
.then(() -> source.assertMinRequested(1))
.then(source::complete)
.verifyComplete();
verify(mockServerWebSocket).writeBinaryMessage(Buffer.buffer("test1"));
verify(mockServerWebSocket).writeBinaryMessage(Buffer.buffer("test2"));
}
@Test
public void malformedOnError() {
AtomicReference<Throwable> errorDropped = new AtomicReference<>();
Hooks.onErrorDropped(errorDropped::set);
Exception dropError = new IllegalStateException("malformedOnError");
TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(CLEANUP_ON_TERMINATE);
Mono<Integer> source = testPublisher.mono().hide();
new MonoMetrics<>(source, registry)
.subscribe();
testPublisher.complete()
.error(dropError);
Counter malformedMeter = registry
.find(METER_MALFORMED)
.counter();
assertThat(malformedMeter).isNotNull();
assertThat(malformedMeter.count()).isEqualTo(1);
assertThat(errorDropped).hasValue(dropError);
}
@Test
public void requestChannelCase_ErrorFromRequesterShouldTerminatesStreamsOnBothSides() {
TestPublisher<Payload> requesterPublisher = TestPublisher.create();
AssertSubscriber<Payload> requesterSubscriber = new AssertSubscriber<>(0);
AssertSubscriber<Payload> responderSubscriber = new AssertSubscriber<>(0);
TestPublisher<Payload> responderPublisher = TestPublisher.create();
initRequestChannelCase(
requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber);
nextFromResponderPublisher(responderPublisher, requesterSubscriber);
nextFromRequesterPublisher(requesterPublisher, responderSubscriber);
// ensures both sides are terminated
errorFromRequesterPublisher(
requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber);
}
@Test
public void startDoneThenError() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
final TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.error(new IllegalStateException("boom"))
.error(new IllegalStateException("boom2")))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom2");
source.assertNoSubscribers();
//start doesn't cleanup and as such still has a subscriber
end.assertNoSubscribers();
}
@Test
public void normalInnerArrayMoreThanDefaultArraySize4() {
TestPublisher<Integer> ts = TestPublisher.create();
TestPublisher<Integer> ts2 = TestPublisher.create();
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.hide()
.flatMap(f -> f < 5 ? ts : ts2), 0)
.then(() -> ts.next(1, 2))
.thenRequest(2)
.expectNext(1, 2)
.thenRequest(2)
.expectNext(1, 2)
.thenRequest(2)
.expectNext(1, 2)
.thenRequest(2)
.expectNext(1, 2)
.then(() -> ts2.complete())
.then(() -> ts.emit(3))
.thenRequest(4)
.expectNext(3, 3, 3, 3)
.verifyComplete();
}
@Test
public void openCloseLimit() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> open = TestPublisher.create();
TestPublisher<Integer> close = TestPublisher.create();
StepVerifier.create(source.flux()
.bufferWhen(open, o -> close)
.limitRequest(1))
.then(() -> {
open.next(1);
close.complete();
})
.then(() -> {
source.assertNoSubscribers();
open.assertNoSubscribers();
close.assertNoSubscribers();
})
.expectNextMatches(List::isEmpty)
.verifyComplete();
}
@Test
public void monoWrapPublisherDoesntCallAssemblyHook() {
final Publisher<Integer> source = TestPublisher.create();
Assertions.assertThat(source).isNotInstanceOf(Flux.class); //smoke test this is a Publisher
//set the hook AFTER the original operators have been invoked (since they trigger assembly themselves)
AtomicInteger wrappedCount = new AtomicInteger();
Hooks.onEachOperator(p -> {
wrappedCount.incrementAndGet();
return p;
});
Hooks.convertToMonoBypassingHooks(source, true);
Hooks.convertToMonoBypassingHooks(source, false);
Mono.wrap(source, true);
Mono.wrap(source, false);
Assertions.assertThat(wrappedCount).hasValue(0);
}
@Test
public void simpleOnDiscardRequestChannelTest2() {
ByteBufAllocator allocator = rule.alloc();
AssertSubscriber<Payload> assertSubscriber = AssertSubscriber.create(1);
TestPublisher<Payload> testPublisher = TestPublisher.create();
Flux<Payload> payloadFlux = rule.socket.requestChannel(testPublisher);
payloadFlux.subscribe(assertSubscriber);
testPublisher.next(ByteBufPayload.create("d", "m"));
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
testPublisher.next(ByteBufPayload.create("d1", "m1"), ByteBufPayload.create("d2", "m2"));
rule.connection.addToReceivedBuffer(
ErrorFrameCodec.encode(
allocator, streamId, new CustomRSocketException(0x00000404, "test")));
Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release);
rule.assertHasNoLeaks();
}
@Test
public void timeoutLimit() {
TestPublisher<Integer> tp = TestPublisher.create();
StepVerifier.withVirtualTime(() ->
tp.flux().onBackpressureBuffer(Duration.ofSeconds(1), 1, this, VirtualTimeScheduler.get()), 0)
.expectSubscription()
.then(() -> tp.next(1))
.expectNoEvent(Duration.ofMillis(500))
.then(() -> tp.next(2))
.expectNoEvent(Duration.ofMillis(500))
.then(() -> tp.next(3))
.expectNoEvent(Duration.ofMillis(500))
.then(() -> tp.next(4))
.expectNoEvent(Duration.ofMillis(500))
.then(() -> tp.next(5))
.expectNoEvent(Duration.ofMillis(500))
.then(tp::complete)
.thenRequest(1)
.expectNext(5)
.verifyComplete();
assertThat(evicted).containsExactly(1, 2, 3, 4);
}
@Test
public void startError() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.create();
final TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.error(new IllegalStateException("boom")))
.expectErrorMessage("boom")
.verify();
source.assertNoSubscribers();
start.assertNoSubscribers();
end.assertNoSubscribers();
}
@Test
public void errorAfterNextIsNeverTriggered() {
TestPublisher<String> source = TestPublisher.create();
AtomicReference<Throwable> errorDropped = new AtomicReference<>();
Hooks.onErrorDropped(errorDropped::set);
StepVerifier.withVirtualTime(() ->
new MonoDelayElement<>(source.mono(), 2, TimeUnit.SECONDS, defaultSchedulerForDelay()))
.expectSubscription()
.then(() -> source.next("foo").error(new IllegalStateException("boom")))
.expectNoEvent(Duration.ofSeconds(2))
.expectNext("foo")
.verifyComplete();
assertThat(errorDropped.get()).isNull();
}
@Test
public void cancelUpstreamOnceWhenRejected() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
vts.dispose();
TestPublisher<Object> testPublisher = TestPublisher.createCold();
testPublisher.emit("Hello");
StepVerifier.create(new MonoDelayElement<>(testPublisher.mono(), 2, TimeUnit.SECONDS, vts))
.verifyErrorSatisfies(e -> {
assertThat(e)
.isInstanceOf(RejectedExecutionException.class)
.hasMessage("Scheduler unavailable");
});
testPublisher.assertWasRequested();
testPublisher.assertCancelled(1);
}
@Test
public void shouldReturnFluxOfServices() {
TestPublisher<String> discoveryClient1Publisher = TestPublisher.createCold();
discoveryClient1Publisher.emit("serviceAFromClient1");
discoveryClient1Publisher.emit("serviceBFromClient1");
discoveryClient1Publisher.complete();
TestPublisher<String> discoveryClient2Publisher = TestPublisher.createCold();
discoveryClient2Publisher.emit("serviceCFromClient2");
discoveryClient2Publisher.complete();
when(discoveryClient1.getServices()).thenReturn(discoveryClient1Publisher.flux());
when(discoveryClient2.getServices()).thenReturn(discoveryClient2Publisher.flux());
ReactiveCompositeDiscoveryClient client = new ReactiveCompositeDiscoveryClient(
asList(discoveryClient1, discoveryClient2));
assertThat(client.description()).isEqualTo("Composite Reactive Discovery Client");
Flux<String> services = client.getServices();
StepVerifier.create(services).expectNext("serviceAFromClient1")
.expectNext("serviceBFromClient1").expectNext("serviceCFromClient2")
.expectComplete().verify();
}
@Test
public void shouldBeRequestedOneFromUpstreamTwiceInCaseOfConditional() throws InterruptedException {
TestPublisher<Long> publisher = TestPublisher.createCold();
ArrayList<Long> capture = new ArrayList<>();
ArrayList<Long> requested = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
Flux<Long> switchTransformed = publisher.flux()
.doOnRequest(requested::add)
.switchOnFirst((first, innerFlux) -> innerFlux)
.filter(e -> false);
publisher.next(1L);
publisher.complete();
switchTransformed.subscribeWith(new LambdaSubscriber<>(capture::add, __ -> {}, latch::countDown, s -> s.request(1)));
latch.await(5, TimeUnit.SECONDS);
Assertions.assertThat(capture).isEmpty();
Assertions.assertThat(requested).containsExactly(1L, 1L);
}
@Test
public void startDoneThenComplete() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
final TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.error(new IllegalStateException("boom"))
.complete())
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasNotDroppedErrors();
source.assertNoSubscribers();
//start doesn't cleanup and as such still has a subscriber
end.assertNoSubscribers();
}
@Test
public void depthCompleteCancelRace() {
for (int i = 0; i < 1000; i++) {
final TestPublisher<Integer> pp = TestPublisher.create();
final AssertSubscriber<Integer> ts = AssertSubscriber.create(1);
Mono.just(0)
.expandDeep(it -> pp)
.subscribe(ts);
Runnable r1 = pp::complete;
Runnable r2 = ts::cancel;
RaceTestUtils.race(r1, r2, Schedulers.single());
}
}
void nextFromResponderPublisher(
TestPublisher<Payload> responderPublisher, AssertSubscriber<Payload> requesterSubscriber) {
// ensures that downstream is not terminated so the requestChannel state is half-closed
responderPublisher.assertSubscribers(1);
requesterSubscriber.assertNotTerminated();
// ensures responderPublisher can send messages and outerSubscriber can receive them
requesterSubscriber.request(5);
responderPublisher.next(
DefaultPayload.create("rd1", "rm1"),
DefaultPayload.create("rd2"),
DefaultPayload.create("rd3", "rm3"),
DefaultPayload.create("rd4"),
DefaultPayload.create("rd5", "rm5"));
List<Payload> outerPayloads = requesterSubscriber.awaitAndAssertNextValueCount(5).values();
Assertions.assertThat(outerPayloads.stream().map(Payload::getDataUtf8))
.containsExactly("rd1", "rd2", "rd3", "rd4", "rd5");
Assertions.assertThat(outerPayloads.stream().map(Payload::hasMetadata))
.containsExactly(true, false, true, false, true);
Assertions.assertThat(outerPayloads.stream().map(Payload::getMetadataUtf8))
.containsExactly("rm1", "", "rm3", "", "rm5");
}
@Test
public void onBackpressureBufferMaxCallbackSourceEmitsAfterComplete() {
TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(TestPublisher.Violation.DEFER_CANCELLATION);
CopyOnWriteArrayList<Integer> overflown = new CopyOnWriteArrayList<>();
AtomicInteger producedCounter = new AtomicInteger();
StepVerifier.create(testPublisher.flux()
.doOnNext(i -> producedCounter.incrementAndGet())
.onBackpressureBuffer(3, overflown::add),
StepVerifierOptions.create().initialRequest(0).checkUnderRequesting(false))
.thenRequest(5)
.then(() -> testPublisher.next(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
.expectNext(1, 2, 3, 4, 5)
.thenAwait() //at this point the buffer is overrun since the range request was unbounded
.thenRequest(100) //requesting more empties the buffer before an overflow error is propagated
.expectNext(6, 7, 8)
.expectErrorMatches(Exceptions::isOverflow)
.verifyThenAssertThat()
.hasDroppedExactly(10, 11, 12, 13, 14, 15);
//the rest, asserted above, is dropped because the source was cancelled
assertThat(overflown).as("passed to overflow handler").containsExactly(9);
assertThat(producedCounter).as("bad source produced").hasValue(15);
}
@Test
public void malformedOnError() {
AtomicReference<Throwable> errorDropped = new AtomicReference<>();
Hooks.onErrorDropped(errorDropped::set);
Exception dropError = new IllegalStateException("malformedOnError");
TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(CLEANUP_ON_TERMINATE);
Flux<Integer> source = testPublisher.flux().hide();
new FluxMetrics<>(source, registry)
.subscribe();
testPublisher.next(1)
.complete()
.error(dropError);
Counter malformedMeter = registry
.find(METER_MALFORMED)
.counter();
assertThat(malformedMeter).isNotNull();
assertThat(malformedMeter.count()).isEqualTo(1);
assertThat(errorDropped).hasValue(dropError);
}
@Test
public void startDoneThenNext() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
final TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.error(new IllegalStateException("boom"))
.next(1))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasNotDroppedErrors()
.hasNotDroppedElements();
source.assertNoSubscribers();
//start doesn't cleanup and as such still has a subscriber
end.assertNoSubscribers();
}
@Test
public void shouldNeverSendIncorrectRequestSizeToUpstream() throws InterruptedException {
TestPublisher<Long> publisher = TestPublisher.createCold();
AtomicLong capture = new AtomicLong();
ArrayList<Long> requested = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
Flux<Long> switchTransformed = publisher.flux()
.doOnRequest(requested::add)
.switchOnFirst((first, innerFlux) -> innerFlux);
publisher.next(1L);
publisher.complete();
switchTransformed.subscribeWith(new LambdaSubscriber<>(capture::set, __ -> {}, latch::countDown, s -> s.request(1)));
latch.await(5, TimeUnit.SECONDS);
Assertions.assertThat(capture.get()).isEqualTo(1L);
Assertions.assertThat(requested).containsExactly(1L);
}
@Test
public void parallelSubscribeAndDispose() throws InterruptedException {
AtomicInteger nextCount = new AtomicInteger();
CountDownLatch cancelLatch = new CountDownLatch(1);
TestPublisher<Integer> source = TestPublisher.create();
Disposable d = source
.flux()
.parallel(3)
.doOnCancel(cancelLatch::countDown)
.subscribe(i -> nextCount.incrementAndGet());
source.next(1, 2, 3);
d.dispose();
source.emit(4, 5, 6);
boolean finished = cancelLatch.await(300, TimeUnit.MILLISECONDS);
assertThat(finished).as("cancelled latch").isTrue();
assertThat(d.isDisposed()).as("disposed").isTrue();
assertThat(nextCount.get()).as("received count").isEqualTo(3);
}