下面列出了怎么用io.reactivex.processors.ReplayProcessor的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testProcessors_output() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
ReplayProcessor<String> processor = ReplayProcessor.create();
when(cmd.getOutputProcessor()).thenReturn(processor);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
publisher.onNext("some-output");
publisher.onNext(uuid + " 255");
processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-output");
OutputHarvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0);
assertThat(crop.exitCode, is(255));
assertThat(crop.buffer, is(nullValue()));
}
@Test
public void testProcessors_errors() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
ReplayProcessor<String> processor = ReplayProcessor.create();
when(cmd.getErrorProcessor()).thenReturn(processor);
TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test();
publisher.onNext("some-errors");
publisher.onNext(uuid);
processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-errors");
Harvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0);
assertThat(crop.buffer, is(nullValue()));
}
@Test
public void shouldHandleAddCommands() {
// given:
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
final USet.AddCommand<UUID> command2 = new USet.AddCommand<>(set.getCrdtId(), uuid2);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleRemoveCommands() {
// given:
final UUID uuid1 = UUID.randomUUID();
final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
final USet.RemoveCommand<UUID> command2 = new USet.RemoveCommand<>(set.getCrdtId(), uuid1);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void itShouldOverwriteOnlyPartialCommandsFromReceivedCommand() {
// given
final TestSubscriber<MVRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
final Processor<MVRegister.SetCommand<String>, MVRegister.SetCommand<String>> inCommands2 = ReplayProcessor.create();
final MVRegister<String> register1 = new MVRegister<>(NODE_ID_1, CRDT_ID);
register1.subscribe(outCommands1);
final MVRegister<String> register2 = new MVRegister<>(NODE_ID_2, CRDT_ID);
register2.subscribeTo(inCommands2);
register1.set("Hello World");
register2.set("Goodbye World");
inCommands2.onNext(outCommands1.values().get(0));
// when
register1.set("42");
inCommands2.onNext(outCommands1.values().get(1));
// then
assertThat(register1.get(), containsInAnyOrder("42"));
assertThat(register2.get(), containsInAnyOrder("42", "Goodbye World"));
}
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("2", UUID.randomUUID()));
final ORSet.AddCommand<String> command3 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(3));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleDuplicateCommands() {
// given:
final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final ORSet.AddCommand<String> command = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
// when:
inputStream.onNext(command);
inputStream.onNext(command);
// then:
assertThat(set, hasSize(1));
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final GSet.AddCommand<String> command1 = new GSet.AddCommand<>(set.getCrdtId(), "1");
final GSet.AddCommand<String> command2 = new GSet.AddCommand<>(set.getCrdtId(), "2");
final GSet.AddCommand<String> command3 = new GSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleDuplicateCommands() {
// given:
final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final GSet.AddCommand<String> command = new GSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command);
inputStream.onNext(command);
// then:
assertThat(set, hasSize(1));
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "2");
final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleRemoveCommands() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.RemoveCommand<String> command3 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleRemoveCommandArrivesBeforeAddCommand() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.RemoveCommand<String> command1 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
public static <V> Flowable<V> takeElements(Supplier<RFuture<V>> callable) {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
AtomicLong counter = new AtomicLong(n);
AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, p, counter, futureRef);
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
futureRef.get().cancel(true);
}
});
}
});
}
private static <V> void take(Supplier<RFuture<V>> factory, ReplayProcessor<V> p, AtomicLong counter, AtomicReference<RFuture<V>> futureRef) {
RFuture<V> future = factory.get();
futureRef.set(future);
future.onComplete((res, e) -> {
if (e != null) {
p.onError(e);
return;
}
p.onNext(res);
if (counter.decrementAndGet() == 0) {
p.onComplete();
}
take(factory, p, counter, futureRef);
});
}
@Test
public void itShouldIgnoreOlderValueFromReceivedCommands() {
// given
final TestSubscriber<MVRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
final TestSubscriber<MVRegister.SetCommand<String>> outCommands2 = TestSubscriber.create();
final Processor<MVRegister.SetCommand<String>, MVRegister.SetCommand<String>> inCommands3 = ReplayProcessor.create();
final MVRegister<String> register1 = new MVRegister<>(NODE_ID_1, CRDT_ID);
register1.subscribe(outCommands1);
final MVRegister<String> register2 = new MVRegister<>(NODE_ID_2, CRDT_ID);
register2.subscribe(outCommands2);
register1.subscribeTo(register2);
register2.subscribeTo(register1);
final MVRegister<String> register3 = new MVRegister<>(NODE_ID_3, CRDT_ID);
register3.subscribeTo(inCommands3);
// when
register1.set("Hello World");
register2.set("Goodbye World");
final MVRegister.SetCommand<String> oldCommand = outCommands1.values().get(0);
final MVRegister.SetCommand<String> newCommand = outCommands2.values().get(1);
inCommands3.onNext(newCommand);
inCommands3.onNext(oldCommand);
// then
assertThat(register3.get(), contains("Goodbye World"));
}
@SuppressWarnings("unchecked")
@Test
public void itShouldIgnoreOlderValueFromReceivedCommands() {
// given
final TestSubscriber<LWWRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
final TestSubscriber<LWWRegister.SetCommand<String>> outCommands2 = TestSubscriber.create();
final Processor<LWWRegister.SetCommand<String>, LWWRegister.SetCommand<String>> inCommands3 = ReplayProcessor.create();
final LWWRegister<String> register1 = new LWWRegister<>(NODE_ID_1, CRDT_ID);
register1.subscribe(outCommands1);
final LWWRegister<String> register2 = new LWWRegister<>(NODE_ID_2, CRDT_ID);
register2.subscribe(outCommands2);
register1.subscribeTo(register2);
register2.subscribeTo(register1);
final LWWRegister<String> register3 = new LWWRegister<>(NODE_ID_3, CRDT_ID);
register3.subscribeTo(inCommands3);
// when
register1.set("Hello World");
register2.set("Goodbye World");
final LWWRegister.SetCommand<String> oldCommand = outCommands1.values().get(0);
final LWWRegister.SetCommand<String> newCommand = outCommands2.values().get(1);
inCommands3.onNext(newCommand);
inCommands3.onNext(oldCommand);
// then
assertThat(register3.get(), is("Goodbye World"));
}
@Test
public void shouldHandleRemoveCommands() {
// given:
final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final ORSet.Element<String> elem1 = new ORSet.Element<>("1", UUID.randomUUID());
final ORSet.Element<String> elem2 = new ORSet.Element<>("1", UUID.randomUUID());
final Set<ORSet.Element<String>> elements = new HashSet<>(Arrays.asList(elem1, elem2));
final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), elem1);
final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), elem2);
final ORSet.RemoveCommand<String> command3 = new ORSet.RemoveCommand<>(set.getCrdtId(), elements);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(3));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
ReplayProcessor<R> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
RFuture<R> future;
try {
future = supplier.call();
} catch (Exception e) {
p.onError(e);
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
future.cancel(true);
}
});
future.onComplete((res, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (res != null) {
p.onNext(res);
}
p.onComplete();
});
}
});
}
private Publisher<V> iterator(int startIndex, boolean forward) {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private int currentIndex = startIndex;
@Override
public void accept(long n) throws Exception {
instance.getAsync(currentIndex).onComplete((value, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (value != null) {
p.onNext(value);
if (forward) {
currentIndex++;
} else {
currentIndex--;
}
}
if (value == null) {
p.onComplete();
return;
}
if (n-1 == 0) {
return;
}
try {
accept(n-1);
} catch (Exception e1) {
e1.printStackTrace();
}
});
}
});
}
public <M> Flowable<M> getMessages(Class<M> type) {
ReplayProcessor<M> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
AtomicLong counter = new AtomicLong(n);
RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
@Override
public void onMessage(CharSequence channel, M msg) {
p.onNext(msg);
if (counter.decrementAndGet() == 0) {
topic.removeListenerAsync(this);
p.onComplete();
}
}
});
t.onComplete((id, e) -> {
if (e != null) {
p.onError(e);
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
topic.removeListenerAsync(id);
}
});
});
}
});
}
public Publisher<V> iterator() {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private int currentIndex = 0;
@Override
public void accept(long n) throws Exception {
queue.getValueAsync(currentIndex).onComplete((value, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (value != null) {
p.onNext(value);
currentIndex++;
}
if (value == null) {
p.onComplete();
return;
}
if (n-1 == 0) {
return;
}
try {
accept(n-1);
} catch (Exception e1) {
e1.printStackTrace();
}
});
}
});
}
public MVRegister(String nodeId, String crdtId) {
super(nodeId, crdtId, ReplayProcessor.create());
}
@Test
public void itShouldAddElementsConcurrently() {
int i1 = 0;
int i2 = 0;
// given
final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands1 = ReplayProcessor.create();
final TestSubscriber<RGA.RGACommand<String>> outCommands1 = TestSubscriber.create();
final RGA<String> rga1 = new RGA<>(NODE_ID_1, CRDT_ID);
rga1.subscribeTo(inCommands1);
rga1.subscribe(outCommands1);
final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands2 = ReplayProcessor.create();
final TestSubscriber<RGA.RGACommand<String>> outCommands2 = TestSubscriber.create();
final RGA<String> rga2 = new RGA<>(NODE_ID_2, CRDT_ID);
rga2.subscribeTo(inCommands2);
rga2.subscribe(outCommands2);
// when
rga1.add(0, "A1");
rga2.add(0, "A2");
inCommands2.onNext(outCommands1.values().get(i1));
inCommands1.onNext(outCommands2.values().get(i2));
// then
assertThat(rga1, contains("A2", "A1"));
assertThat(rga2, contains("A2", "A1"));
// when
rga1.add(0, "B1");
rga2.add(0, "B2");
inCommands2.onNext(outCommands1.values().get(i1+=2));
inCommands1.onNext(outCommands2.values().get(i2+=2));
// then
assertThat(rga1, contains("B2", "B1", "A2", "A1"));
assertThat(rga2, contains("B2", "B1", "A2", "A1"));
// when
rga1.add(1, "C1");
rga2.add(1, "C2");
inCommands2.onNext(outCommands1.values().get(i1+=2));
inCommands1.onNext(outCommands2.values().get(i2+=2));
// then
assertThat(rga1, contains("B2", "C2", "C1", "B1", "A2", "A1"));
assertThat(rga2, contains("B2", "C2", "C1", "B1", "A2", "A1"));
// when
rga1.add(6, "D1");
rga2.add(6, "D2");
inCommands2.onNext(outCommands1.values().get(i1 + 2));
inCommands1.onNext(outCommands2.values().get(i2 + 2));
// then
assertThat(rga1, contains("B2", "C2", "C1", "B1", "A2", "A1", "D2", "D1"));
assertThat(rga2, contains("B2", "C2", "C1", "B1", "A2", "A1", "D2", "D1"));
}
@Test
public void itShouldAddAndRemoveSingleElementConcurrently() {
int i1 = 0;
// given
final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands1 = ReplayProcessor.create();
final TestSubscriber<RGA.RGACommand<String>> outCommands1 = TestSubscriber.create();
final RGA<String> rga1 = new RGA<>(NODE_ID_1, CRDT_ID);
rga1.subscribeTo(inCommands1);
rga1.subscribe(outCommands1);
final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands2 = ReplayProcessor.create();
final TestSubscriber<RGA.RGACommand<String>> outCommands2 = TestSubscriber.create();
final RGA<String> rga2 = new RGA<>(NODE_ID_2, CRDT_ID);
rga2.subscribeTo(inCommands2);
rga2.subscribe(outCommands2);
rga1.add("A");
inCommands2.onNext(outCommands1.values().get(i1));
int i2 = i1;
// when
rga1.remove(0);
rga2.add(0, "B");
inCommands2.onNext(outCommands1.values().get(++i1));
inCommands1.onNext(outCommands2.values().get(++i2));
// then
assertThat(rga1, contains("B"));
assertThat(rga2, contains("B"));
// when
rga1.remove(0);
rga2.add(1, "C");
inCommands2.onNext(outCommands1.values().get(i1 + 2));
inCommands1.onNext(outCommands2.values().get(i2 + 2));
// then
assertThat(rga1, contains("C"));
assertThat(rga2, contains("C"));
}
SimpleCrdt(String nodeId, String crdtId) {
super(nodeId, crdtId, ReplayProcessor.create());
}
protected GoogleApiProcessor() {
this(ReplayProcessor.create());
}
private GoogleApiProcessor(ReplayProcessor<GoogleApiProcessor.Event> processor) {
this.processor = processor;
}
public static RxJava2ProcProxy replayProcessorProxy() {
return new RxJava2ProcProxy(ReplayProcessor.create(), Roxy.TePolicy.PASS);
}
public static RxJava2ProcProxy serializedReplayProcessorProxy() {
return new RxJava2ProcProxy(ReplayProcessor.create().toSerialized(), Roxy.TePolicy.PASS);
}
public static RxJava2ProcProxy safeReplayProcessorProxy() {
return new RxJava2ProcProxy(ReplayProcessor.create(), Roxy.TePolicy.WRAP);
}