io.reactivex.processors.PublishProcessor#create ( )源码实例Demo

下面列出了io.reactivex.processors.PublishProcessor#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void convertsFromPublisherSubscribeWithDelay() {
    PublishProcessor<String> processor = PublishProcessor.create();
    processor.delaySubscription(100, TimeUnit.SECONDS, sBackgroundScheduler);
    LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);

    liveData.observe(mLifecycleOwner, mObserver);

    processor.onNext("foo");
    liveData.removeObserver(mObserver);
    sBackgroundScheduler.triggerActions();
    liveData.observe(mLifecycleOwner, mObserver);

    processor.onNext("bar");
    processor.onNext("baz");

    assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "bar", "baz")));
}
 
源代码2 项目: RxShell   文件: CmdProcessorTest.java
@Test
public void testCommand_callback_async() {
    processor.attach(session);

    int cnt = 100;
    List<EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>();
    for (int j = 0; j < cnt; j++) {
        List<String> cmds = new ArrayList<>();
        for (int i = 0; i < 10; i++) cmds.add("echo " + i);
        cmds.add("echo " + j);

        PublishProcessor<String> outputListener = PublishProcessor.create();
        TestSubscriber<String> outputObserver = outputListener.observeOn(Schedulers.newThread()).doOnEach(stringNotification -> TestHelper.sleep(1)).test();
        final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build();
        final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test();
        testSubscribers.add(new EnvVar<>(resultObserver, outputObserver));
    }
    for (EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>> envVar : testSubscribers) {
        envVar.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete();
        envVar.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11);
    }
}
 
@Before
public void init() {
    this.mockWeb3j = mock(Web3j.class);

    mockEthBlock = mock(EthBlock.class);
    mockEventStoreService = mock(EventStoreService.class);
    final EthBlock.Block mockBlock = mock(EthBlock.Block.class);

    when(mockBlock.getNumber()).thenReturn(BLOCK_NUMBER);
    when(mockBlock.getHash()).thenReturn(BLOCK_HASH);
    when(mockBlock.getTimestamp()).thenReturn(BLOCK_TIMESTAMP);
    when(mockEthBlock.getBlock()).thenReturn(mockBlock);

    blockPublishProcessor = PublishProcessor.create();
    when(mockWeb3j.blockFlowable(true)).thenReturn(blockPublishProcessor);

    underTest = new PollingBlockSubscriptionStrategy(mockWeb3j,
            NODE_NAME, mockEventStoreService, new DummyAsyncTaskService());
}
 
@Nullable
@Override
public View onCreateView(@NonNull LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
    binding = DataBindingUtil.inflate(inflater, R.layout.section_selector_fragment, container, false);
    binding.setPresenter(activity.getPresenter());
    this.flowableProcessor = PublishProcessor.create();
    this.sectionProcessor = PublishProcessor.create();
    this.flowableOptions = PublishProcessor.create();

    binding.actionButton.setOnClickListener(view -> {
        presenter.onActionButtonClick();
    });

    presenter.init();

    return binding.getRoot();
}
 
源代码5 项目: akarnokd-misc   文件: FlatMapWithTwoErrors.java
@Test
public void innerCancelled2() {
    PublishProcessor<Integer> pp1 = PublishProcessor.create();
    PublishProcessor<Integer> pp2 = PublishProcessor.create();
    
    pp1
    .concatMap(v -> pp2)
    .test();

    pp1.onNext(1);
    assertTrue("No subscribers?", pp2.hasSubscribers());

    pp1.onError(new Exception());
    
    assertFalse("Has subscribers?", pp2.hasSubscribers());
}
 
@Override
public View onCreateView(
    LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
  View layout = inflater.inflate(R.layout.fragment_double_binding_textview, container, false);
  unbinder = ButterKnife.bind(this, layout);

  _resultEmitterSubject = PublishProcessor.create();

  _disposable =
      _resultEmitterSubject.subscribe(
          aFloat -> {
            _result.setText(String.valueOf(aFloat));
          });

  onNumberChanged();
  _number2.requestFocus();

  return layout;
}
 
SyncManagerPresenter(
        D2 d2,
        SchedulerProvider schedulerProvider,
        GatewayValidator gatewayValidator,
        PreferenceProvider preferenceProvider,
        WorkManagerController workManagerController,
        SettingsRepository settingsRepository,
        SyncManagerContracts.View view,
        AnalyticsHelper analyticsHelper) {
    this.view = view;
    this.d2 = d2;
    this.settingsRepository = settingsRepository;
    this.schedulerProvider = schedulerProvider;
    this.preferenceProvider = preferenceProvider;
    this.gatewayValidator = gatewayValidator;
    this.workManagerController = workManagerController;
    this.analyticsHelper = analyticsHelper;
    checkData = PublishProcessor.create();
    compositeDisposable = new CompositeDisposable();
}
 
@Override
public void onStart() {
  super.onStart();

  publishProcessor = PublishProcessor.create();

  disposable =
      publishProcessor
          .startWith(getConnectivityStatus(getActivity()))
          .distinctUntilChanged()
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(
              online -> {
                if (online) {
                  log("You are online");
                } else {
                  log("You are offline");
                }
              });

  listenToNetworkConnectivity();
}
 
@Test
public void convertsFromPublisher() {
    PublishProcessor<String> processor = PublishProcessor.create();
    LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);

    liveData.observe(mLifecycleOwner, mObserver);

    processor.onNext("foo");
    processor.onNext("bar");
    processor.onNext("baz");

    assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
}
 
源代码10 项目: akarnokd-misc   文件: PublishProcessorPerf.java
@Setup
public void setup(Blackhole bh) {
    unbounded = PublishProcessor.create();
    unbounded.subscribe(new PerfConsumer(bh));

    bounded = PublishProcessor.create();
    bounded.subscribe(new PerfBoundedSubscriber(bh, 1000 * 1000));

    subject = PublishSubject.create();
    subject.subscribe(new PerfConsumer(bh));
}
 
源代码11 项目: ground-android   文件: ObservationListViewModel.java
@Inject
public ObservationListViewModel(ObservationRepository observationRepository) {
  this.observationRepository = observationRepository;
  observationListRequests = PublishProcessor.create();
  observationList =
      LiveDataReactiveStreams.fromPublisher(
          observationListRequests
              .doOnNext(__ -> loadingSpinnerVisibility.set(View.VISIBLE))
              .switchMapSingle(this::getObservations)
              .doOnNext(__ -> loadingSpinnerVisibility.set(View.GONE)));
}
 
@Before
public void init() throws IOException {
    this.mockWeb3j = mock(Web3j.class);

    mockNewHeadsNotification = mock(NewHeadsNotification.class);
    mockEventStoreService = mock(EventStoreService.class);
    when(mockNewHeadsNotification.getParams()).thenReturn(new NewHeadNotificationParameter());

    mockNewHead = mock(NewHead.class);
    when(mockNewHead.getHash()).thenReturn(BLOCK_HASH);

    blockPublishProcessor = PublishProcessor.create();
    when(mockWeb3j.newHeadsNotifications()).thenReturn(blockPublishProcessor);

    mockEthBlock = mock(EthBlock.class);
    final EthBlock.Block mockBlock = mock(EthBlock.Block.class);

    when(mockBlock.getNumber()).thenReturn(BLOCK_NUMBER);
    when(mockBlock.getHash()).thenReturn(BLOCK_HASH);
    when(mockBlock.getTimestamp()).thenReturn(Numeric.toBigInt(BLOCK_TIMESTAMP));
    when(mockEthBlock.getBlock()).thenReturn(mockBlock);

    final Request<?, EthBlock> mockRequest = mock(Request.class);
    doReturn(mockRequest).when(mockWeb3j).ethGetBlockByHash(BLOCK_HASH, true);

    when(mockRequest.send()).thenReturn(mockEthBlock);

    underTest = new PubSubBlockSubscriptionStrategy(mockWeb3j, NODE_NAME,
            mockEventStoreService, new DummyAsyncTaskService());
}
 
源代码13 项目: dhis2-android-capture-app   文件: FormAdapter.java
public FormAdapter(FragmentManager fm, Context context, SearchTEContractsModule.Presenter presenter) {
    setHasStableIds(true);
    this.processor = PublishProcessor.create();
    this.processorOptionSet = PublishProcessor.create();
    this.context = context;
    this.presenter = presenter;

    LayoutInflater layoutInflater = LayoutInflater.from(context);
    attributeList = new ArrayList<>();
    rows = new ArrayList<>();

    rows.add(EDITTEXT, new EditTextRow(layoutInflater, processor, false, false));
    rows.add(BUTTON, new FileRow(layoutInflater, processor, false));
    rows.add(CHECKBOX, new RadioButtonRow(layoutInflater, processor, false));
    rows.add(SPINNER, new SpinnerRow(layoutInflater, processor, processorOptionSet, false));
    rows.add(COORDINATES, new CoordinateRow(layoutInflater, processor, false, FeatureType.POINT));
    rows.add(TIME, new DateTimeRow(layoutInflater, processor, TIME, false));
    rows.add(DATE, new DateTimeRow(layoutInflater, processor, DATE, false));
    rows.add(DATETIME, new DateTimeRow(layoutInflater, processor, DATETIME, false));
    rows.add(AGEVIEW, new AgeRow(layoutInflater, processor, false));
    rows.add(YES_NO, new RadioButtonRow(layoutInflater, processor, false));
    rows.add(ORG_UNIT, new OrgUnitRow(fm, layoutInflater, processor, false));
    rows.add(IMAGE, new ImageRow(layoutInflater, processor, null));
    rows.add(UNSUPPORTED, new UnsupportedRow(layoutInflater));
    rows.add(LONG_TEXT, new EditTextRow(layoutInflater, processor, false, true));
    rows.add(SCAN_CODE, new ScanTextRow(layoutInflater, processor, false));
}
 
源代码14 项目: akarnokd-misc   文件: BufferWithConditionAndTime.java
@Test
public void test() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
        o.buffer(o.filter(v -> v.contains("Start")), 
                v -> Flowable.merge(o.filter(w -> w.contains("End")), 
                        Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f)
.subscribe(System.out::println);

pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");

pp.onNext("Start");
pp.onNext("C");

scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
    
}
 
源代码15 项目: storio   文件: AbstractEmissionCheckerTest.java
@Test
public void shouldAssertThatNoExpectedValuesLeft() {
    Queue<String> expectedValues = new LinkedList<String>();

    expectedValues.add("1");
    expectedValues.add("2");
    expectedValues.add("3");

    final PublishProcessor<String> publishProcessor = PublishProcessor.create();

    AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
        @NonNull
        @Override
        public Disposable subscribe() {
            return publishProcessor
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            onNextObtained(s);
                        }
                    });
        }
    };

    Disposable disposable = emissionChecker.subscribe();

    publishProcessor.onNext("1");

    // "1"
    emissionChecker.awaitNextExpectedValue();

    publishProcessor.onNext("2");

    // "2"
    emissionChecker.awaitNextExpectedValue();

    publishProcessor.onNext("3");

    // "3"
    emissionChecker.awaitNextExpectedValue();

    // Should not throw exception
    emissionChecker.assertThatNoExpectedValuesLeft();

    disposable.dispose();
}
 
源代码16 项目: RHub   文件: States.java
@Setup(Level.Iteration)
public void setup() {
    ps = PublishProcessor.create();
}
 
源代码17 项目: storio   文件: AbstractEmissionCheckerTest.java
@Test
public void shouldStoreItemsInQueueAndThenAwaitNextExpectedValues() {
    final Queue<String> expectedValues = new LinkedList<String>();

    expectedValues.add("1");
    expectedValues.add("2");
    expectedValues.add("3");

    final PublishProcessor<String> publishProcessor = PublishProcessor.create();

    final AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
        @NonNull
        @Override
        public Disposable subscribe() {
            return publishProcessor
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            onNextObtained(s);
                        }
                    });
        }
    };

    final Disposable disposable = emissionChecker.subscribe();

    // Notice: We emit several values before awaiting any of them

    publishProcessor.onNext("1");
    publishProcessor.onNext("2");
    publishProcessor.onNext("3");

    // Now we should successfully await all these items one by one
    emissionChecker.awaitNextExpectedValue();
    emissionChecker.awaitNextExpectedValue();
    emissionChecker.awaitNextExpectedValue();

    emissionChecker.assertThatNoExpectedValuesLeft();

    disposable.dispose();
}
 
源代码18 项目: RHub   文件: Akka.java
private static void genericProcessorPrxoy() {
    final int BUFF = 64;
    system = ActorSystem.create("Example");
    materializer = ActorMaterializer.create(
            ActorMaterializerSettings.create(system)
                    .withDebugLogging(true)
                    .withAutoFusing(true)
                    .withInputBuffer(BUFF, BUFF)
            ,
            system);

    AkkaProcProxy demo = new AkkaProcProxy(PublishProcessor.create(),
            Roxy.TePolicy.WRAP, materializer);

    demo.pub().subscribe(new Subscriber() {

        Subscription s;

        @Override
        public void onSubscribe(Subscription subscription) {
            System.out.println("onSubscribe");
            s = subscription;
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
            System.out.println(o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("onError: ");
            t.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });

    demo.addUpstream(Source.range(201, 2000)
            .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), materializer));
    new Thread(() -> {
        try {
            Thread.sleep(333);
            demo.emit("AAA");
            demo.emit(5);
            demo.emit(5);
            demo.addUpstream(Flowable.range(1, 100));
            demo.emit(5);
            demo.emit(5);
            demo.emit(5);
            demo.emit(5);
            Thread.sleep(3333);
            demo.complete();
            system.terminate();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();


    //system.terminate();
}
 
源代码19 项目: RxShell   文件: HarvesterTest.java
@Override
public void setup() throws Exception {
    super.setup();
    harvesterFactory = new Harvester.Factory();
    publisher = PublishProcessor.create();
}
 
源代码20 项目: storio   文件: AbstractEmissionCheckerTest.java
@Test
public void shouldThrowExceptionBecauseFlowableEmittedUnexpectedItemAfterExpectedSequence() {
    List<Throwable> errors = TestHelper.trackPluginErrors();

    final Queue<String> expectedValues = new LinkedList<String>();

    expectedValues.add("1");
    expectedValues.add("2");
    expectedValues.add("3");

    final PublishProcessor<String> publishProcessor = PublishProcessor.create();

    final AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
        @NonNull
        @Override
        public Disposable subscribe() {
            return publishProcessor
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            onNextObtained(s);
                        }
                    });
        }
    };

    final Disposable disposable = emissionChecker.subscribe();

    publishProcessor.onNext("1");
    publishProcessor.onNext("2");
    publishProcessor.onNext("3");

    emissionChecker.awaitNextExpectedValue();
    emissionChecker.awaitNextExpectedValue();
    emissionChecker.awaitNextExpectedValue();

    emissionChecker.assertThatNoExpectedValuesLeft();

    assertThat(errors).isEmpty();

    publishProcessor.onNext("4");

    assertThat(errors).hasSize(1);
    assertThat(errors.get(0).getCause())
            .hasMessage("Received emission, but no more emissions were expected: obtained 4, expectedValues = [], obtainedValues = []");

    disposable.dispose();
}