类io.reactivex.processors.ReplayProcessor源码实例Demo

下面列出了怎么用io.reactivex.processors.ReplayProcessor的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RxShell   文件: HarvesterTest.java
@Test
public void testProcessors_output() {
    String uuid = UUID.randomUUID().toString();
    when(cmd.getMarker()).thenReturn(uuid);
    ReplayProcessor<String> processor = ReplayProcessor.create();
    when(cmd.getOutputProcessor()).thenReturn(processor);

    TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();

    publisher.onNext("some-output");
    publisher.onNext(uuid + " 255");

    processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-output");
    OutputHarvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0);
    assertThat(crop.exitCode, is(255));
    assertThat(crop.buffer, is(nullValue()));
}
 
源代码2 项目: RxShell   文件: HarvesterTest.java
@Test
public void testProcessors_errors() {
    String uuid = UUID.randomUUID().toString();
    when(cmd.getMarker()).thenReturn(uuid);
    ReplayProcessor<String> processor = ReplayProcessor.create();
    when(cmd.getErrorProcessor()).thenReturn(processor);

    TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test();

    publisher.onNext("some-errors");
    publisher.onNext(uuid);

    processor.test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).assertValue("some-errors");
    Harvester.Crop crop = testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1).values().get(0);
    assertThat(crop.buffer, is(nullValue()));
}
 
源代码3 项目: wurmloch-crdt   文件: USetTest.java
@Test
public void shouldHandleAddCommands() {
    // given:
    final UUID uuid1 = UUID.randomUUID();
    final UUID uuid2 = UUID.randomUUID();
    final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final USet<UUID> set = new USet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
    final USet.AddCommand<UUID> command2 = new USet.AddCommand<>(set.getCrdtId(), uuid2);

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);

    // then:
    assertThat(set, hasSize(2));
    assertThat(subscriber.valueCount(), is(2));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码4 项目: wurmloch-crdt   文件: USetTest.java
@Test
public void shouldHandleRemoveCommands() {
    // given:
    final UUID uuid1 = UUID.randomUUID();
    final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final USet<UUID> set = new USet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
    final USet.RemoveCommand<UUID> command2 = new USet.RemoveCommand<>(set.getCrdtId(), uuid1);

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);

    // then:
    assertThat(set, empty());
    assertThat(subscriber.valueCount(), is(2));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码5 项目: wurmloch-crdt   文件: MVRegisterTest.java
@Test
public void itShouldOverwriteOnlyPartialCommandsFromReceivedCommand() {
    // given
    final TestSubscriber<MVRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
    final Processor<MVRegister.SetCommand<String>, MVRegister.SetCommand<String>> inCommands2 = ReplayProcessor.create();
    final MVRegister<String> register1 = new MVRegister<>(NODE_ID_1, CRDT_ID);
    register1.subscribe(outCommands1);
    final MVRegister<String> register2 = new MVRegister<>(NODE_ID_2, CRDT_ID);
    register2.subscribeTo(inCommands2);

    register1.set("Hello World");
    register2.set("Goodbye World");
    inCommands2.onNext(outCommands1.values().get(0));

    // when
    register1.set("42");
    inCommands2.onNext(outCommands1.values().get(1));

    // then
    assertThat(register1.get(), containsInAnyOrder("42"));
    assertThat(register2.get(), containsInAnyOrder("42", "Goodbye World"));
}
 
源代码6 项目: wurmloch-crdt   文件: ORSetTest.java
@Test
public void shouldHandleAddCommands() {
    // given:
    final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final ORSet<String> set = new ORSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
    final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("2", UUID.randomUUID()));
    final ORSet.AddCommand<String> command3 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, hasSize(2));
    assertThat(subscriber.valueCount(), is(3));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码7 项目: wurmloch-crdt   文件: ORSetTest.java
@Test
public void shouldHandleDuplicateCommands() {
    // given:
    final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final ORSet<String> set = new ORSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final ORSet.AddCommand<String> command = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));

    // when:
    inputStream.onNext(command);
    inputStream.onNext(command);

    // then:
    assertThat(set, hasSize(1));
    assertThat(subscriber.valueCount(), is(1));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码8 项目: wurmloch-crdt   文件: GSetTest.java
@Test
public void shouldHandleAddCommands() {
    // given:
    final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final GSet<String> set = new GSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final GSet.AddCommand<String> command1 = new GSet.AddCommand<>(set.getCrdtId(), "1");
    final GSet.AddCommand<String> command2 = new GSet.AddCommand<>(set.getCrdtId(), "2");
    final GSet.AddCommand<String> command3 = new GSet.AddCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, hasSize(2));
    assertThat(subscriber.valueCount(), is(2));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码9 项目: wurmloch-crdt   文件: GSetTest.java
@Test
public void shouldHandleDuplicateCommands() {
    // given:
    final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final GSet<String> set = new GSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final GSet.AddCommand<String> command = new GSet.AddCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command);
    inputStream.onNext(command);

    // then:
    assertThat(set, hasSize(1));
    assertThat(subscriber.valueCount(), is(1));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码10 项目: wurmloch-crdt   文件: TwoPSetTest.java
@Test
public void shouldHandleAddCommands() {
    // given:
    final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final TwoPSet<String> set = new TwoPSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
    final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "2");
    final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, hasSize(2));
    assertThat(subscriber.valueCount(), is(2));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码11 项目: wurmloch-crdt   文件: TwoPSetTest.java
@Test
public void shouldHandleRemoveCommands() {
    // given:
    final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final TwoPSet<String> set = new TwoPSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
    final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
    final TwoPSet.RemoveCommand<String> command3 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, empty());
    assertThat(subscriber.valueCount(), is(2));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码12 项目: wurmloch-crdt   文件: TwoPSetTest.java
@Test
public void shouldHandleRemoveCommandArrivesBeforeAddCommand() {
    // given:
    final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final TwoPSet<String> set = new TwoPSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final TwoPSet.RemoveCommand<String> command1 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");
    final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
    final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, empty());
    assertThat(subscriber.valueCount(), is(1));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码13 项目: redisson   文件: ElementsStream.java
public static <V> Flowable<V> takeElements(Supplier<RFuture<V>> callable) {
    ReplayProcessor<V> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        @Override
        public void accept(long n) throws Exception {
            AtomicLong counter = new AtomicLong(n);
            AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
            
            take(callable, p, counter, futureRef);

            p.doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    futureRef.get().cancel(true);
                }
            });
        }
    });
}
 
源代码14 项目: redisson   文件: ElementsStream.java
private static <V> void take(Supplier<RFuture<V>> factory, ReplayProcessor<V> p, AtomicLong counter, AtomicReference<RFuture<V>> futureRef) {
    RFuture<V> future = factory.get();
    futureRef.set(future);
    future.onComplete((res, e) -> {
        if (e != null) {
            p.onError(e);
            return;
        }
        
        p.onNext(res);
        if (counter.decrementAndGet() == 0) {
            p.onComplete();
        }
        
        take(factory, p, counter, futureRef);
    });
}
 
源代码15 项目: wurmloch-crdt   文件: MVRegisterTest.java
@Test
public void itShouldIgnoreOlderValueFromReceivedCommands() {
    // given
    final TestSubscriber<MVRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
    final TestSubscriber<MVRegister.SetCommand<String>> outCommands2 = TestSubscriber.create();
    final Processor<MVRegister.SetCommand<String>, MVRegister.SetCommand<String>> inCommands3 = ReplayProcessor.create();
    final MVRegister<String> register1 = new MVRegister<>(NODE_ID_1, CRDT_ID);
    register1.subscribe(outCommands1);
    final MVRegister<String> register2 = new MVRegister<>(NODE_ID_2, CRDT_ID);
    register2.subscribe(outCommands2);
    register1.subscribeTo(register2);
    register2.subscribeTo(register1);
    final MVRegister<String> register3 = new MVRegister<>(NODE_ID_3, CRDT_ID);
    register3.subscribeTo(inCommands3);


    // when
    register1.set("Hello World");
    register2.set("Goodbye World");
    final MVRegister.SetCommand<String> oldCommand = outCommands1.values().get(0);
    final MVRegister.SetCommand<String> newCommand = outCommands2.values().get(1);
    inCommands3.onNext(newCommand);
    inCommands3.onNext(oldCommand);

    // then
    assertThat(register3.get(), contains("Goodbye World"));
}
 
源代码16 项目: wurmloch-crdt   文件: LWWRegisterTest.java
@SuppressWarnings("unchecked")
@Test
public void itShouldIgnoreOlderValueFromReceivedCommands() {
    // given
    final TestSubscriber<LWWRegister.SetCommand<String>> outCommands1 = TestSubscriber.create();
    final TestSubscriber<LWWRegister.SetCommand<String>> outCommands2 = TestSubscriber.create();
    final Processor<LWWRegister.SetCommand<String>, LWWRegister.SetCommand<String>> inCommands3 = ReplayProcessor.create();
    final LWWRegister<String> register1 = new LWWRegister<>(NODE_ID_1, CRDT_ID);
    register1.subscribe(outCommands1);
    final LWWRegister<String> register2 = new LWWRegister<>(NODE_ID_2, CRDT_ID);
    register2.subscribe(outCommands2);
    register1.subscribeTo(register2);
    register2.subscribeTo(register1);
    final LWWRegister<String> register3 = new LWWRegister<>(NODE_ID_3, CRDT_ID);
    register3.subscribeTo(inCommands3);

    // when
    register1.set("Hello World");
    register2.set("Goodbye World");
    final LWWRegister.SetCommand<String> oldCommand = outCommands1.values().get(0);
    final LWWRegister.SetCommand<String> newCommand = outCommands2.values().get(1);
    inCommands3.onNext(newCommand);
    inCommands3.onNext(oldCommand);

    // then
    assertThat(register3.get(), is("Goodbye World"));
}
 
源代码17 项目: wurmloch-crdt   文件: ORSetTest.java
@Test
public void shouldHandleRemoveCommands() {
    // given:
    final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final ORSet<String> set = new ORSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final ORSet.Element<String> elem1 = new ORSet.Element<>("1", UUID.randomUUID());
    final ORSet.Element<String> elem2 = new ORSet.Element<>("1", UUID.randomUUID());
    final Set<ORSet.Element<String>> elements = new HashSet<>(Arrays.asList(elem1, elem2));
    final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), elem1);
    final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), elem2);
    final ORSet.RemoveCommand<String> command3 = new ORSet.RemoveCommand<>(set.getCrdtId(), elements);

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, empty());
    assertThat(subscriber.valueCount(), is(3));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码18 项目: redisson   文件: CommandRxService.java
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
    ReplayProcessor<R> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        @Override
        public void accept(long t) throws Exception {
            RFuture<R> future;
            try {
                future = supplier.call();
            } catch (Exception e) {
                p.onError(e);
                return;
            }
            p.doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    future.cancel(true);
                }
            });
            
            future.onComplete((res, e) -> {
               if (e != null) {
                   p.onError(e);
                   return;
               }
               
               if (res != null) {
                   p.onNext(res);
               }
               p.onComplete();
            });
        }
    });
}
 
源代码19 项目: redisson   文件: RedissonListRx.java
private Publisher<V> iterator(int startIndex, boolean forward) {
    ReplayProcessor<V> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {

        private int currentIndex = startIndex;
        
        @Override
        public void accept(long n) throws Exception {
            instance.getAsync(currentIndex).onComplete((value, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }
                
                if (value != null) {
                    p.onNext(value);
                    if (forward) {
                        currentIndex++;
                    } else {
                        currentIndex--;
                    }
                }
                
                if (value == null) {
                    p.onComplete();
                    return;
                }
                if (n-1 == 0) {
                    return;
                }
                try {
                    accept(n-1);
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            });
        }
    });
}
 
源代码20 项目: redisson   文件: RedissonTopicRx.java
public <M> Flowable<M> getMessages(Class<M> type) {
    ReplayProcessor<M> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        @Override
        public void accept(long n) throws Exception {
            AtomicLong counter = new AtomicLong(n);
            RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
                @Override
                public void onMessage(CharSequence channel, M msg) {
                    p.onNext(msg);
                    if (counter.decrementAndGet() == 0) {
                        topic.removeListenerAsync(this);
                        p.onComplete();
                    }
                }
            });
            t.onComplete((id, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }
                
                p.doOnCancel(new Action() {
                    @Override
                    public void run() throws Exception {
                        topic.removeListenerAsync(id);
                    }
                });
            });
        }
    });
}
 
源代码21 项目: redisson   文件: RedissonTransferQueueRx.java
public Publisher<V> iterator() {
    ReplayProcessor<V> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {

        private int currentIndex = 0;

        @Override
        public void accept(long n) throws Exception {
            queue.getValueAsync(currentIndex).onComplete((value, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }

                if (value != null) {
                    p.onNext(value);
                    currentIndex++;
                }

                if (value == null) {
                    p.onComplete();
                    return;
                }
                if (n-1 == 0) {
                    return;
                }
                try {
                    accept(n-1);
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            });
        }

    });
}
 
源代码22 项目: wurmloch-crdt   文件: MVRegister.java
public MVRegister(String nodeId, String crdtId) {
    super(nodeId, crdtId, ReplayProcessor.create());
}
 
源代码23 项目: wurmloch-crdt   文件: RGATest.java
@Test
public void itShouldAddElementsConcurrently() {
    int i1 = 0;
    int i2 = 0;

    // given
    final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands1 = ReplayProcessor.create();
    final TestSubscriber<RGA.RGACommand<String>> outCommands1 = TestSubscriber.create();
    final RGA<String> rga1 = new RGA<>(NODE_ID_1, CRDT_ID);
    rga1.subscribeTo(inCommands1);
    rga1.subscribe(outCommands1);

    final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands2 = ReplayProcessor.create();
    final TestSubscriber<RGA.RGACommand<String>> outCommands2 = TestSubscriber.create();
    final RGA<String> rga2 = new RGA<>(NODE_ID_2, CRDT_ID);
    rga2.subscribeTo(inCommands2);
    rga2.subscribe(outCommands2);

    // when
    rga1.add(0, "A1");
    rga2.add(0, "A2");
    inCommands2.onNext(outCommands1.values().get(i1));
    inCommands1.onNext(outCommands2.values().get(i2));

    // then
    assertThat(rga1, contains("A2", "A1"));
    assertThat(rga2, contains("A2", "A1"));

    // when
    rga1.add(0, "B1");
    rga2.add(0, "B2");
    inCommands2.onNext(outCommands1.values().get(i1+=2));
    inCommands1.onNext(outCommands2.values().get(i2+=2));

    // then
    assertThat(rga1, contains("B2", "B1", "A2", "A1"));
    assertThat(rga2, contains("B2", "B1", "A2", "A1"));

    // when
    rga1.add(1, "C1");
    rga2.add(1, "C2");
    inCommands2.onNext(outCommands1.values().get(i1+=2));
    inCommands1.onNext(outCommands2.values().get(i2+=2));

    // then
    assertThat(rga1, contains("B2", "C2", "C1", "B1", "A2", "A1"));
    assertThat(rga2, contains("B2", "C2", "C1", "B1", "A2", "A1"));

    // when
    rga1.add(6, "D1");
    rga2.add(6, "D2");
    inCommands2.onNext(outCommands1.values().get(i1 + 2));
    inCommands1.onNext(outCommands2.values().get(i2 + 2));

    // then
    assertThat(rga1, contains("B2", "C2", "C1", "B1", "A2", "A1", "D2", "D1"));
    assertThat(rga2, contains("B2", "C2", "C1", "B1", "A2", "A1", "D2", "D1"));
}
 
源代码24 项目: wurmloch-crdt   文件: RGATest.java
@Test
public void itShouldAddAndRemoveSingleElementConcurrently() {
    int i1 = 0;

    // given
    final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands1 = ReplayProcessor.create();
    final TestSubscriber<RGA.RGACommand<String>> outCommands1 = TestSubscriber.create();
    final RGA<String> rga1 = new RGA<>(NODE_ID_1, CRDT_ID);
    rga1.subscribeTo(inCommands1);
    rga1.subscribe(outCommands1);

    final Processor<RGA.RGACommand<String>, RGA.RGACommand<String>> inCommands2 = ReplayProcessor.create();
    final TestSubscriber<RGA.RGACommand<String>> outCommands2 = TestSubscriber.create();
    final RGA<String> rga2 = new RGA<>(NODE_ID_2, CRDT_ID);
    rga2.subscribeTo(inCommands2);
    rga2.subscribe(outCommands2);

    rga1.add("A");
    inCommands2.onNext(outCommands1.values().get(i1));
    int i2 = i1;

    // when
    rga1.remove(0);
    rga2.add(0, "B");
    inCommands2.onNext(outCommands1.values().get(++i1));
    inCommands1.onNext(outCommands2.values().get(++i2));

    // then
    assertThat(rga1, contains("B"));
    assertThat(rga2, contains("B"));

    // when
    rga1.remove(0);
    rga2.add(1, "C");
    inCommands2.onNext(outCommands1.values().get(i1 + 2));
    inCommands1.onNext(outCommands2.values().get(i2 + 2));

    // then
    assertThat(rga1, contains("C"));
    assertThat(rga2, contains("C"));
}
 
源代码25 项目: wurmloch-crdt   文件: SimpleCrdt.java
SimpleCrdt(String nodeId, String crdtId) {
    super(nodeId, crdtId, ReplayProcessor.create());
}
 
源代码26 项目: eternity   文件: GoogleApiProcessor.java
protected GoogleApiProcessor() {
  this(ReplayProcessor.create());
}
 
源代码27 项目: eternity   文件: GoogleApiProcessor.java
private GoogleApiProcessor(ReplayProcessor<GoogleApiProcessor.Event> processor) {
  this.processor = processor;
}
 
源代码28 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2ProcProxy replayProcessorProxy() {
    return new RxJava2ProcProxy(ReplayProcessor.create(), Roxy.TePolicy.PASS);
}
 
源代码29 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2ProcProxy serializedReplayProcessorProxy() {
    return new RxJava2ProcProxy(ReplayProcessor.create().toSerialized(), Roxy.TePolicy.PASS);
}
 
源代码30 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2ProcProxy safeReplayProcessorProxy() {
    return new RxJava2ProcProxy(ReplayProcessor.create(), Roxy.TePolicy.WRAP);
}
 
 类所在包
 类方法
 同包方法