下面列出了io.reactivex.subscribers.TestSubscriber#assertNotComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) {
Flowable<Integer> flowable = Flowable.range(1, 10)
.filter(n -> n % 2 == 0);
TestSubscriber<Integer> testSubscriber = flowable.test(0);
testSubscriber
.assertNever(n -> n % 2 == 1)
.requestMore(2)
.assertValues(2, 4)
.assertNotComplete()
.requestMore(3)
.assertValues(2, 4, 6, 8, 10)
.assertComplete();
testSubscriber = flowable.test(0);
testSubscriber
.assertNotComplete()
.requestMore(2)
.assertValues(2, 4)
.cancel();
testSubscriber
.assertNotComplete();
}
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "2");
final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
// given:
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribe(subscriber);
// when:
set.add(uuid1);
set.add(uuid2);
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), uuid1),
new AddCommandMatcher<>(set.getCrdtId(), uuid2)
));
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForRemoves() {
// given:
final UUID uuid1 = UUID.randomUUID();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribe(subscriber);
set.add(uuid1);
// when:
final Iterator<UUID> it = set.iterator();
it.next();
it.remove();
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), uuid1),
new RemoveCommandMatcher<>(set.getCrdtId(), uuid1)
));
}
@Test
public void shouldHandleAddCommands() {
// given:
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
final USet.AddCommand<UUID> command2 = new USet.AddCommand<>(set.getCrdtId(), uuid2);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleRemoveCommands() {
// given:
final UUID uuid1 = UUID.randomUUID();
final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
final USet.RemoveCommand<UUID> command2 = new USet.RemoveCommand<>(set.getCrdtId(), uuid1);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribe(subscriber);
// when:
set.add("1");
set.add("2");
set.add("1");
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "2")
));
}
@Test
public void shouldCallOnError() {
Throwable throwable = new IllegalStateException("Test exception");
//noinspection unchecked
PreparedOperation<String, Optional<String>, String> preparedOperation = mock(PreparedOperation.class);
when(preparedOperation.executeAsBlocking()).thenThrow(throwable);
TestSubscriber<Optional<String>> testSubscriber = TestSubscriber.create();
Flowable
.create(new FlowableOnSubscribeExecuteAsBlockingOptional<String, String>(preparedOperation), MISSING)
.subscribe(testSubscriber);
testSubscriber.assertError(throwable);
testSubscriber.assertNoValues();
testSubscriber.assertNotComplete();
}
@Test
public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsTrue() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Predicate<Throwable> predicate = new Predicate<Throwable>() {
@Override
public boolean test(Throwable t) {
return t instanceof IllegalArgumentException;
}
};
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).retryIf(predicate).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertNotComplete();
}
@Test
public void shouldHandleRemoveCommands() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.RemoveCommand<String> command3 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForRemoves() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribe(subscriber);
set.add("1");
set.add("1");
// when:
final Iterator<String> it = set.iterator();
it.next();
it.remove();
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new RemoveCommandMatcher<>(set.getCrdtId(), "1", "1")
));
}
@Test
public void testWithScheduler() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertNotComplete();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2);
ts.assertNotComplete();
// next wait is 2 seconds so advancing by 1 should do nothing
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2);
ts.assertNotComplete();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2, 1, 2);
ts.assertError(ex);
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForRemoves() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribe(subscriber);
set.add("1");
set.add("1");
// when:
final Iterator<String> it = set.iterator();
it.next();
it.remove();
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new RemoveCommandMatcher<>(set.getCrdtId(), "1")
));
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribe(subscriber);
// when:
set.add("1");
set.add("2");
set.add("1");
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "2"),
new AddCommandMatcher<>(set.getCrdtId(), "1")
));
}
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final GSet.AddCommand<String> command1 = new GSet.AddCommand<>(set.getCrdtId(), "1");
final GSet.AddCommand<String> command2 = new GSet.AddCommand<>(set.getCrdtId(), "2");
final GSet.AddCommand<String> command3 = new GSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleDuplicateCommands() {
// given:
final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final GSet.AddCommand<String> command = new GSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command);
inputStream.onNext(command);
// then:
assertThat(set, hasSize(1));
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@SuppressWarnings("unchecked")
@Test
public void itShouldSendCommandsOnUpdates() {
// given
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final MVRegister<String> register = new MVRegister<>(NODE_ID_1, CRDT_ID);
register.subscribe(subscriber);
// when
register.set("Hello World");
// then
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new SetCommandMatcher<>(CRDT_ID, "Hello World")
));
// when
register.set("Hello World");
// then
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new SetCommandMatcher<>(CRDT_ID, "Hello World")
));
// when
register.set("Goodbye World");
// then
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new SetCommandMatcher<>(CRDT_ID, "Hello World"),
new SetCommandMatcher<>(CRDT_ID, "Goodbye World")
));
}
@Test
public void testInsertDuplicateId() {
final Mono<Person> insertMono = cosmosTemplate.insert(TEST_PERSON,
new PartitionKey(personInfo.getPartitionKeyFieldValue(TEST_PERSON)));
final TestSubscriber testSubscriber = new TestSubscriber();
insertMono.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNotComplete();
testSubscriber.assertTerminated();
assertThat(testSubscriber.errors()).hasSize(1);
assertThat(((List) testSubscriber.getEvents().get(1)).get(0))
.isInstanceOf(CosmosDBAccessException.class);
}
@Test
@SuppressWarnings("unchecked")
public void shouldSendAllCommandsAfterConnect() {
// given:
final CrdtStore store1 = new CrdtStore(NODE_ID_1);
store1.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
final TestSubscriber<CrdtDefinition> store1Subscriber = TestSubscriber.create();
store1.subscribe(store1Subscriber);
final CrdtStore store2 = new CrdtStore(NODE_ID_2);
store2.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
final TestSubscriber<CrdtDefinition> store2Subscriber = TestSubscriber.create();
store2.subscribe(store2Subscriber);
final SimpleCrdt crdt1 = store1.createCrdt(SimpleCrdt.class, CRDT_ID);
final TestSubscriber<CrdtCommand> crdt1Subscriber = TestSubscriber.create();
crdt1.subscribe(crdt1Subscriber);
final SimpleCrdt crdt2 = store2.createCrdt(SimpleCrdt.class, CRDT_ID);
final TestSubscriber<CrdtCommand> crdt2Subscriber = TestSubscriber.create();
crdt2.subscribe(crdt2Subscriber);
final SimpleCommand command1_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
crdt1.sendCommands(command1_1);
final SimpleCommand command2_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
crdt2.sendCommands(command2_1);
final SimpleCommand command3_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
final SimpleCommand command3_2 = new SimpleCommand(NODE_ID_1, CRDT_ID);
final SimpleCommand command3_3 = new SimpleCommand(NODE_ID_1, CRDT_ID);
crdt1.sendCommands(command3_1, command3_2, command3_3);
final SimpleCommand command4_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
final SimpleCommand command4_2 = new SimpleCommand(NODE_ID_2, CRDT_ID);
final SimpleCommand command4_3 = new SimpleCommand(NODE_ID_2, CRDT_ID);
crdt2.sendCommands(command4_1, command4_2, command4_3);
// when:
store1.connect(store2);
// then:
assertThat(store1Subscriber.valueCount(), is(1));
store1Subscriber.assertNotComplete();
store1Subscriber.assertNoErrors();
crdt1Subscriber.assertValues(
command1_1,
command3_1,
command3_2,
command3_3,
command2_1,
command4_1,
command4_2,
command4_3
);
crdt1Subscriber.assertNotComplete();
crdt1Subscriber.assertNoErrors();
assertThat(store2Subscriber.valueCount(), is(1));
store2Subscriber.assertNotComplete();
store2Subscriber.assertNoErrors();
crdt2Subscriber.assertValues(
command2_1,
command4_1,
command4_2,
command4_3,
command1_1,
command3_1,
command3_2,
command3_3
);
crdt2Subscriber.assertNotComplete();
crdt2Subscriber.assertNoErrors();
}
@Test
@SuppressWarnings("unchecked")
public void shouldSendCommandsToConnectedStore() {
// given:
final CrdtStore store1 = new CrdtStore(NODE_ID_1);
store1.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
final TestSubscriber<CrdtDefinition> store1Subscriber = TestSubscriber.create();
store1.subscribe(store1Subscriber);
final CrdtStore store2 = new CrdtStore(NODE_ID_2);
store2.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
final TestSubscriber<CrdtDefinition> store2Subscriber = TestSubscriber.create();
store2.subscribe(store2Subscriber);
store1.connect(store2);
final SimpleCrdt crdt1 = store1.createCrdt(SimpleCrdt.class, CRDT_ID);
final TestSubscriber<CrdtCommand> crdt1Subscriber = TestSubscriber.create();
crdt1.subscribe(crdt1Subscriber);
final SimpleCrdt crdt2 = (SimpleCrdt) store2.findCrdt(CRDT_ID).get();
final TestSubscriber<CrdtCommand> crdt2Subscriber = TestSubscriber.create();
crdt2.subscribe(crdt2Subscriber);
// when:
final SimpleCommand command1_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
crdt1.sendCommands(command1_1);
final SimpleCommand command2_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
crdt2.sendCommands(command2_1);
final SimpleCommand command3_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
final SimpleCommand command3_2 = new SimpleCommand(NODE_ID_1, CRDT_ID);
final SimpleCommand command3_3 = new SimpleCommand(NODE_ID_1, CRDT_ID);
crdt1.sendCommands(command3_1, command3_2, command3_3);
final SimpleCommand command4_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
final SimpleCommand command4_2 = new SimpleCommand(NODE_ID_2, CRDT_ID);
final SimpleCommand command4_3 = new SimpleCommand(NODE_ID_2, CRDT_ID);
crdt2.sendCommands(command4_1, command4_2, command4_3);
// then:
assertThat(store1Subscriber.valueCount(), is(1));
store1Subscriber.assertNotComplete();
store1Subscriber.assertNoErrors();
crdt1Subscriber.assertValues(
command1_1,
command2_1,
command3_1,
command3_2,
command3_3,
command4_1,
command4_2,
command4_3
);
crdt1Subscriber.assertNotComplete();
crdt1Subscriber.assertNoErrors();
assertThat(store2Subscriber.valueCount(), is(1));
store2Subscriber.assertNotComplete();
store2Subscriber.assertNoErrors();
crdt2Subscriber.assertValues(
command1_1,
command2_1,
command3_1,
command3_2,
command3_3,
command4_1,
command4_2,
command4_3
);
crdt2Subscriber.assertNotComplete();
crdt2Subscriber.assertNoErrors();
}