下面列出了怎么用io.reactivex.exceptions.MissingBackpressureException的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testOutgoingWithoutKafkaClusterWithoutBackPressure() throws InterruptedException {
container = KafkaTestBase.baseWeld();
KafkaTestBase.addConfig(myKafkaSinkConfig());
container.addBeanClasses(MyOutgoingBeanWithoutBackPressure.class);
WeldContainer weld = this.container.initialize();
nap();
MyOutgoingBeanWithoutBackPressure bean = weld
.select(MyOutgoingBeanWithoutBackPressure.class).get();
Throwable throwable = bean.error();
assertThat(throwable).isNotNull();
assertThat(throwable).isInstanceOf(MissingBackpressureException.class);
}
@Test
void emitsErrorOnResponseStreamBufferOverflow() {
System.setProperty(OcraftApiConfig.CLIENT_BUFFER_SIZE_RESPONSE_STREAM, "1");
System.setProperty(OcraftApiConfig.CLIENT_BUFFER_SIZE_RESPONSE_EVENT_BUS, String.valueOf(ITERATION_COUNT));
System.setProperty(OcraftApiConfig.CLIENT_BUFFER_SIZE_RESPONSE_BACKPRESSURE, "1");
refreshConfig();
prepareGame();
TestS2ClientSubscriber subscriber = TestS2ClientSubscriber.withBackPressure();
client.responseStream().subscribe(subscriber);
startRequestStream();
assertThatSubscriberReceivedError(subscriber, MissingBackpressureException.class);
}
@Test public void whenMissingStrategyUsed_thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
@Test public void whenErrorStrategyUsed_thenExceptionIsThrown() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}