下面列出了怎么用io.reactivex.processors.PublishProcessor的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void convertsFromPublisherSubscribeWithDelay() {
PublishProcessor<String> processor = PublishProcessor.create();
processor.delaySubscription(100, TimeUnit.SECONDS, sBackgroundScheduler);
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
liveData.removeObserver(mObserver);
sBackgroundScheduler.triggerActions();
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("bar");
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "bar", "baz")));
}
@Test
public void convertsFromPublisherWithMultipleObservers() {
final List<String> output2 = new ArrayList<>();
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
processor.onNext("bar");
// The second observer should only get the newest value and any later values.
liveData.observe(mLifecycleOwner, new Observer<String>() {
@Override
public void onChanged(@Nullable String s) {
output2.add(s);
}
});
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
assertThat(output2, is(Arrays.asList("bar", "baz")));
}
@Test
public void convertsFromPublisherWithMultipleObserversAfterInactive() {
final List<String> output2 = new ArrayList<>();
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
processor.onNext("bar");
// The second observer should only get the newest value and any later values.
liveData.observe(mLifecycleOwner, new Observer<String>() {
@Override
public void onChanged(@Nullable String s) {
output2.add(s);
}
});
liveData.removeObserver(mObserver);
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar")));
assertThat(output2, is(Arrays.asList("bar", "baz")));
}
@Before
public void init() {
this.mockWeb3j = mock(Web3j.class);
mockEthBlock = mock(EthBlock.class);
mockEventStoreService = mock(EventStoreService.class);
final EthBlock.Block mockBlock = mock(EthBlock.Block.class);
when(mockBlock.getNumber()).thenReturn(BLOCK_NUMBER);
when(mockBlock.getHash()).thenReturn(BLOCK_HASH);
when(mockBlock.getTimestamp()).thenReturn(BLOCK_TIMESTAMP);
when(mockEthBlock.getBlock()).thenReturn(mockBlock);
blockPublishProcessor = PublishProcessor.create();
when(mockWeb3j.blockFlowable(true)).thenReturn(blockPublishProcessor);
underTest = new PollingBlockSubscriptionStrategy(mockWeb3j,
NODE_NAME, mockEventStoreService, new DummyAsyncTaskService());
}
@Nullable
@Override
public View onCreateView(@NonNull LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
binding = DataBindingUtil.inflate(inflater, R.layout.section_selector_fragment, container, false);
binding.setPresenter(activity.getPresenter());
this.flowableProcessor = PublishProcessor.create();
this.sectionProcessor = PublishProcessor.create();
this.flowableOptions = PublishProcessor.create();
binding.actionButton.setOnClickListener(view -> {
presenter.onActionButtonClick();
});
presenter.init();
return binding.getRoot();
}
public SearchTEPresenter(SearchTEContractsModule.View view,
D2 d2,
SearchRepository searchRepository,
SchedulerProvider schedulerProvider,
AnalyticsHelper analyticsHelper,
@Nullable String initialProgram) {
this.view = view;
this.searchRepository = searchRepository;
this.d2 = d2;
this.schedulerProvider = schedulerProvider;
this.analyticsHelper = analyticsHelper;
compositeDisposable = new CompositeDisposable();
queryData = new HashMap<>();
queryProcessor = PublishProcessor.create();
mapProcessor = PublishProcessor.create();
enrollmentMapProcessor = PublishProcessor.create();
selectedProgram = initialProgram != null ? d2.programModule().programs().uid(initialProgram).blockingGet() : null;
currentProgram = BehaviorSubject.createDefault(initialProgram != null ? initialProgram : "");
}
public TeiDashboardPresenter(
TeiDashboardContracts.View view,
String teiUid, String programUid,
DashboardRepository dashboardRepository,
SchedulerProvider schedulerProvider,
AnalyticsHelper analyticsHelper,
PreferenceProvider preferenceProvider,
FilterManager filterManager
) {
this.view = view;
this.teiUid = teiUid;
this.programUid = programUid;
this.analyticsHelper = analyticsHelper;
this.dashboardRepository = dashboardRepository;
this.schedulerProvider = schedulerProvider;
this.preferenceProvider = preferenceProvider;
this.filterManager = filterManager;
compositeDisposable = new CompositeDisposable();
notesCounterProcessor = PublishProcessor.create();
}
SyncManagerPresenter(
D2 d2,
SchedulerProvider schedulerProvider,
GatewayValidator gatewayValidator,
PreferenceProvider preferenceProvider,
WorkManagerController workManagerController,
SettingsRepository settingsRepository,
SyncManagerContracts.View view,
AnalyticsHelper analyticsHelper) {
this.view = view;
this.d2 = d2;
this.settingsRepository = settingsRepository;
this.schedulerProvider = schedulerProvider;
this.preferenceProvider = preferenceProvider;
this.gatewayValidator = gatewayValidator;
this.workManagerController = workManagerController;
this.analyticsHelper = analyticsHelper;
checkData = PublishProcessor.create();
compositeDisposable = new CompositeDisposable();
}
public void reset() {
ouFilters = new ArrayList<>();
stateFilters = new ArrayList<>();
periodFilters = null;
catOptComboFilters = new ArrayList<>();
eventStatusFilters = new ArrayList<>();
assignedFilter = false;
ouFiltersApplied = new ObservableField<>(0);
stateFiltersApplied = new ObservableField<>(0);
periodFiltersApplied = new ObservableField<>(0);
catOptCombFiltersApplied = new ObservableField<>(0);
eventStatusFiltersApplied = new ObservableField<>(0);
assignedToMeApplied = new ObservableField<>(0);
filterProcessor = PublishProcessor.create();
ouTreeProcessor = PublishProcessor.create();
periodRequestProcessor = PublishProcessor.create();
}
@Test
public void testCommand_callback_sync() {
processor.attach(session);
int cnt = 100;
List<EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>();
for (int j = 0; j < cnt; j++) {
List<String> cmds = new ArrayList<>();
for (int i = 0; i < 10; i++) cmds.add("echo " + i);
cmds.add("echo " + j);
PublishProcessor<String> outputListener = PublishProcessor.create();
TestSubscriber<String> outputObserver = outputListener.doOnEach(stringNotification -> TestHelper.sleep(1)).test();
final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build();
final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test();
testSubscribers.add(new EnvVar<>(resultObserver, outputObserver));
}
for (EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>> envVar : testSubscribers) {
envVar.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete();
envVar.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11);
}
}
@Test
public void testCommand_callback_async() {
processor.attach(session);
int cnt = 100;
List<EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>();
for (int j = 0; j < cnt; j++) {
List<String> cmds = new ArrayList<>();
for (int i = 0; i < 10; i++) cmds.add("echo " + i);
cmds.add("echo " + j);
PublishProcessor<String> outputListener = PublishProcessor.create();
TestSubscriber<String> outputObserver = outputListener.observeOn(Schedulers.newThread()).doOnEach(stringNotification -> TestHelper.sleep(1)).test();
final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build();
final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test();
testSubscribers.add(new EnvVar<>(resultObserver, outputObserver));
}
for (EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>> envVar : testSubscribers) {
envVar.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete();
envVar.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11);
}
}
@Test
public void testBuilder_from() {
Cmd orig = Cmd.builder(UUID.randomUUID().toString())
.outputBuffer(false)
.errorBuffer(false)
.timeout(1337)
.outputProcessor(PublishProcessor.create())
.errorProcessor(PublishProcessor.create())
.build();
Cmd copy = Cmd.from(orig).build();
assertEquals(orig.getCommands(), copy.getCommands());
assertEquals(orig.isOutputBufferEnabled(), copy.isOutputBufferEnabled());
assertEquals(orig.isErrorBufferEnabled(), copy.isErrorBufferEnabled());
assertEquals(orig.getTimeout(), copy.getTimeout());
assertEquals(orig.getOutputProcessor(), copy.getOutputProcessor());
assertEquals(orig.getErrorProcessor(), copy.getErrorProcessor());
}
@Test
public void testBuild() {
final PublishProcessor<String> outputPub = PublishProcessor.create();
final PublishProcessor<String> errorPub = PublishProcessor.create();
Cmd cmd = Cmd.builder("cmd1")
.outputBuffer(false)
.errorBuffer(false)
.timeout(1337)
.outputProcessor(outputPub)
.errorProcessor(errorPub)
.build();
assertThat(cmd.getCommands(), contains("cmd1"));
assertThat(cmd.getOutputProcessor(), is(outputPub));
assertThat(cmd.getErrorProcessor(), is(errorPub));
assertThat(cmd.getTimeout(), is(1337L));
assertThat(cmd.isOutputBufferEnabled(), is(false));
assertThat(cmd.isErrorBufferEnabled(), is(false));
}
@Test
public void testExecute_oneshot_exception_no_buffers() throws IOException {
RxCmdShell shell = mock(RxCmdShell.class);
Exception ex = new IOException("Error message");
when(shell.open()).thenReturn(Single.error(ex));
when(shell.isAlive()).thenReturn(Single.just(false));
final PublishProcessor<String> errorPub = PublishProcessor.create();
final TestSubscriber<String> errorSub = errorPub.test();
final PublishProcessor<String> outputPub = PublishProcessor.create();
final TestSubscriber<String> outputSub = outputPub.test();
final Cmd.Result result = Cmd.builder("")
.outputBuffer(false)
.errorBuffer(false)
.outputProcessor(outputPub)
.errorProcessor(errorPub)
.execute(shell);
assertThat(result.getExitCode(), is(Cmd.ExitCode.EXCEPTION));
assertThat(result.getErrors(), is(nullValue()));
assertThat(result.getOutput(), is(nullValue()));
assertThat(errorSub.valueCount(), is(1));
assertThat(outputSub.valueCount(), is(0));
errorSub.assertComplete();
outputSub.assertComplete();
}
@Test
public void innerCancelled() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
pp1
.flatMap(v -> pp2)
.test();
pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasSubscribers());
pp1.onError(new Exception());
assertFalse("Has subscribers?", pp2.hasSubscribers());
}
@Test
public void innerCancelled2() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
pp1
.concatMap(v -> pp2)
.test();
pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasSubscribers());
pp1.onError(new Exception());
assertFalse("Has subscribers?", pp2.hasSubscribers());
}
@Override
public View onCreateView(
LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
View layout = inflater.inflate(R.layout.fragment_double_binding_textview, container, false);
unbinder = ButterKnife.bind(this, layout);
_resultEmitterSubject = PublishProcessor.create();
_disposable =
_resultEmitterSubject.subscribe(
aFloat -> {
_result.setText(String.valueOf(aFloat));
});
onNumberChanged();
_number2.requestFocus();
return layout;
}
@Override
public void onStart() {
super.onStart();
publishProcessor = PublishProcessor.create();
disposable =
publishProcessor
.startWith(getConnectivityStatus(getActivity()))
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
online -> {
if (online) {
log("You are online");
} else {
log("You are offline");
}
});
listenToNetworkConnectivity();
}
@Test
public void convertsFromPublisher() {
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
}
@Test
public void convertsFromPublisherThrowsException() {
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
IllegalStateException exception = new IllegalStateException("test exception");
try {
processor.onError(exception);
fail("Runtime Exception expected");
} catch (RuntimeException ex) {
assertEquals(ex.getCause(), exception);
}
}
@Test
public void convertsFromPublisherAfterInactive() {
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
liveData.removeObserver(mObserver);
processor.onNext("bar");
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "baz")));
}
@Test
public void convertsFromPublisherManagesSubscriptions() {
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
assertThat(processor.hasSubscribers(), is(false));
liveData.observe(mLifecycleOwner, mObserver);
// once the live data is active, there's a subscriber
assertThat(processor.hasSubscribers(), is(true));
liveData.removeObserver(mObserver);
// once the live data is inactive, the subscriber is removed
assertThat(processor.hasSubscribers(), is(false));
}
@Before
public void init() throws IOException {
this.mockWeb3j = mock(Web3j.class);
mockNewHeadsNotification = mock(NewHeadsNotification.class);
mockEventStoreService = mock(EventStoreService.class);
when(mockNewHeadsNotification.getParams()).thenReturn(new NewHeadNotificationParameter());
mockNewHead = mock(NewHead.class);
when(mockNewHead.getHash()).thenReturn(BLOCK_HASH);
blockPublishProcessor = PublishProcessor.create();
when(mockWeb3j.newHeadsNotifications()).thenReturn(blockPublishProcessor);
mockEthBlock = mock(EthBlock.class);
final EthBlock.Block mockBlock = mock(EthBlock.Block.class);
when(mockBlock.getNumber()).thenReturn(BLOCK_NUMBER);
when(mockBlock.getHash()).thenReturn(BLOCK_HASH);
when(mockBlock.getTimestamp()).thenReturn(Numeric.toBigInt(BLOCK_TIMESTAMP));
when(mockEthBlock.getBlock()).thenReturn(mockBlock);
final Request<?, EthBlock> mockRequest = mock(Request.class);
doReturn(mockRequest).when(mockWeb3j).ethGetBlockByHash(BLOCK_HASH, true);
when(mockRequest.send()).thenReturn(mockEthBlock);
underTest = new PubSubBlockSubscriptionStrategy(mockWeb3j, NODE_NAME,
mockEventStoreService, new DummyAsyncTaskService());
}
public ProgramEventDetailPresenter(
ProgramEventDetailContract.View view,
@NonNull ProgramEventDetailRepository programEventDetailRepository,
SchedulerProvider schedulerProvider,
FilterManager filterManager) {
this.view = view;
this.eventRepository = programEventDetailRepository;
this.schedulerProvider = schedulerProvider;
this.filterManager = filterManager;
eventInfoProcessor = PublishProcessor.create();
mapProcessor = PublishProcessor.create();
compositeDisposable = new CompositeDisposable();
}
public EventCapturePresenterImpl(EventCaptureContract.View view, String eventUid,
EventCaptureContract.EventCaptureRepository eventCaptureRepository,
RulesUtilsProvider rulesUtils,
ValueStore valueStore, SchedulerProvider schedulerProvider) {
this.view = view;
this.eventUid = eventUid;
this.eventCaptureRepository = eventCaptureRepository;
this.rulesUtils = rulesUtils;
this.valueStore = valueStore;
this.schedulerProvider = schedulerProvider;
this.currentPosition = 0;
this.sectionsToHide = new ArrayList<>();
this.currentSection = new ObservableField<>("");
this.errors = new HashMap<>();
this.emptyMandatoryFields = new HashMap<>();
this.canComplete = true;
this.sectionList = new ArrayList<>();
this.compositeDisposable = new CompositeDisposable();
currentSectionPosition = PublishProcessor.create();
sectionProcessor = PublishProcessor.create();
showCalculationProcessor = PublishProcessor.create();
progressProcessor = PublishProcessor.create();
sectionAdjustProcessor = PublishProcessor.create();
formAdjustProcessor = PublishProcessor.create();
notesCounterProcessor = PublishProcessor.create();
formFieldsProcessor = BehaviorSubject.createDefault(new ArrayList<>());
}
public FormAdapter(FragmentManager fm, Context context, SearchTEContractsModule.Presenter presenter) {
setHasStableIds(true);
this.processor = PublishProcessor.create();
this.processorOptionSet = PublishProcessor.create();
this.context = context;
this.presenter = presenter;
LayoutInflater layoutInflater = LayoutInflater.from(context);
attributeList = new ArrayList<>();
rows = new ArrayList<>();
rows.add(EDITTEXT, new EditTextRow(layoutInflater, processor, false, false));
rows.add(BUTTON, new FileRow(layoutInflater, processor, false));
rows.add(CHECKBOX, new RadioButtonRow(layoutInflater, processor, false));
rows.add(SPINNER, new SpinnerRow(layoutInflater, processor, processorOptionSet, false));
rows.add(COORDINATES, new CoordinateRow(layoutInflater, processor, false, FeatureType.POINT));
rows.add(TIME, new DateTimeRow(layoutInflater, processor, TIME, false));
rows.add(DATE, new DateTimeRow(layoutInflater, processor, DATE, false));
rows.add(DATETIME, new DateTimeRow(layoutInflater, processor, DATETIME, false));
rows.add(AGEVIEW, new AgeRow(layoutInflater, processor, false));
rows.add(YES_NO, new RadioButtonRow(layoutInflater, processor, false));
rows.add(ORG_UNIT, new OrgUnitRow(fm, layoutInflater, processor, false));
rows.add(IMAGE, new ImageRow(layoutInflater, processor, null));
rows.add(UNSUPPORTED, new UnsupportedRow(layoutInflater));
rows.add(LONG_TEXT, new EditTextRow(layoutInflater, processor, false, true));
rows.add(SCAN_CODE, new ScanTextRow(layoutInflater, processor, false));
}
public ReservedValuePresenter(ReservedValueRepository repository, D2 d2, SchedulerProvider schedulerProvider, ReservedValueContracts.View view) {
this.repository = repository;
this.d2 = d2;
this.updateProcessor = PublishProcessor.create();
this.schedulerProvider = schedulerProvider;
this.view = view;
this.disposable = new CompositeDisposable();
}
public DataEntryAdapter(@NonNull LayoutInflater layoutInflater,
@NonNull FragmentManager fragmentManager,
@NonNull DataEntryArguments dataEntryArguments) {
super(new DataEntryDiff());
setHasStableIds(true);
rows = new ArrayList<>();
viewModels = new ArrayList<>();
processor = PublishProcessor.create();
sectionProcessor = PublishProcessor.create();
imageSelector = new ObservableField<>("");
selectedSection = new ObservableField<>("");
this.processorOptionSet = PublishProcessor.create();
this.currentFocusUid = new MutableLiveData<>();
rows.add(EDITTEXT, new EditTextRow(layoutInflater, processor, true, dataEntryArguments.renderType(), false, currentFocusUid));
rows.add(BUTTON, new FileRow(layoutInflater, processor, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(CHECKBOX, new RadioButtonRow(layoutInflater, processor, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(SPINNER, new SpinnerRow(layoutInflater, processor, processorOptionSet, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(COORDINATES, new CoordinateRow(layoutInflater, processor, true, dataEntryArguments.renderType(), currentFocusUid, FeatureType.POINT));
rows.add(TIME, new DateTimeRow(layoutInflater, processor, TIME, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(DATE, new DateTimeRow(layoutInflater, processor, DATE, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(DATETIME, new DateTimeRow(layoutInflater, processor, DATETIME, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(AGEVIEW, new AgeRow(layoutInflater, processor, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(YES_NO, new RadioButtonRow(layoutInflater, processor, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(ORG_UNIT, new OrgUnitRow(fragmentManager, layoutInflater, processor, true, dataEntryArguments.renderType(), currentFocusUid));
rows.add(IMAGE, new ImageRow(layoutInflater, processor, dataEntryArguments.renderType()));
rows.add(UNSUPPORTED, new UnsupportedRow(layoutInflater));
rows.add(LONG_TEXT, new EditTextRow(layoutInflater, processor, true, dataEntryArguments.renderType(), true, currentFocusUid));
rows.add(DISPLAY, new DisplayRow(layoutInflater));
rows.add(PICTURE, new PictureRow(fragmentManager, layoutInflater, processor, true));
rows.add(SCAN_CODE, new ScanTextRow(layoutInflater, processor, true));
rows.add(SECTION, new SectionRow(layoutInflater, selectedSection, sectionProcessor));
rows.add(OPTION_SET_SELECT, new OptionSetRow(layoutInflater, processor, true, rendering, currentFocusUid));
}
@Inject
public ObservationListViewModel(ObservationRepository observationRepository) {
this.observationRepository = observationRepository;
observationListRequests = PublishProcessor.create();
observationList =
LiveDataReactiveStreams.fromPublisher(
observationListRequests
.doOnNext(__ -> loadingSpinnerVisibility.set(View.VISIBLE))
.switchMapSingle(this::getObservations)
.doOnNext(__ -> loadingSpinnerVisibility.set(View.GONE)));
}
@Test
public void testUpstreamTerminated_output() {
publisher.onComplete();
OutputHarvester.Crop crop = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0);
assertThat(crop.isComplete, is(false));
publisher = PublishProcessor.create();
publisher.onError(new InterruptedException());
crop = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0);
assertThat(crop.isComplete, is(false));
}