下面列出了io.reactivex.processors.PublishProcessor#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void convertsFromPublisherSubscribeWithDelay() {
PublishProcessor<String> processor = PublishProcessor.create();
processor.delaySubscription(100, TimeUnit.SECONDS, sBackgroundScheduler);
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
liveData.removeObserver(mObserver);
sBackgroundScheduler.triggerActions();
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("bar");
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "bar", "baz")));
}
@Test
public void testCommand_callback_async() {
processor.attach(session);
int cnt = 100;
List<EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>();
for (int j = 0; j < cnt; j++) {
List<String> cmds = new ArrayList<>();
for (int i = 0; i < 10; i++) cmds.add("echo " + i);
cmds.add("echo " + j);
PublishProcessor<String> outputListener = PublishProcessor.create();
TestSubscriber<String> outputObserver = outputListener.observeOn(Schedulers.newThread()).doOnEach(stringNotification -> TestHelper.sleep(1)).test();
final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build();
final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test();
testSubscribers.add(new EnvVar<>(resultObserver, outputObserver));
}
for (EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>> envVar : testSubscribers) {
envVar.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete();
envVar.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11);
}
}
@Before
public void init() {
this.mockWeb3j = mock(Web3j.class);
mockEthBlock = mock(EthBlock.class);
mockEventStoreService = mock(EventStoreService.class);
final EthBlock.Block mockBlock = mock(EthBlock.Block.class);
when(mockBlock.getNumber()).thenReturn(BLOCK_NUMBER);
when(mockBlock.getHash()).thenReturn(BLOCK_HASH);
when(mockBlock.getTimestamp()).thenReturn(BLOCK_TIMESTAMP);
when(mockEthBlock.getBlock()).thenReturn(mockBlock);
blockPublishProcessor = PublishProcessor.create();
when(mockWeb3j.blockFlowable(true)).thenReturn(blockPublishProcessor);
underTest = new PollingBlockSubscriptionStrategy(mockWeb3j,
NODE_NAME, mockEventStoreService, new DummyAsyncTaskService());
}
@Nullable
@Override
public View onCreateView(@NonNull LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
binding = DataBindingUtil.inflate(inflater, R.layout.section_selector_fragment, container, false);
binding.setPresenter(activity.getPresenter());
this.flowableProcessor = PublishProcessor.create();
this.sectionProcessor = PublishProcessor.create();
this.flowableOptions = PublishProcessor.create();
binding.actionButton.setOnClickListener(view -> {
presenter.onActionButtonClick();
});
presenter.init();
return binding.getRoot();
}
@Test
public void innerCancelled2() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
pp1
.concatMap(v -> pp2)
.test();
pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasSubscribers());
pp1.onError(new Exception());
assertFalse("Has subscribers?", pp2.hasSubscribers());
}
@Override
public View onCreateView(
LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
View layout = inflater.inflate(R.layout.fragment_double_binding_textview, container, false);
unbinder = ButterKnife.bind(this, layout);
_resultEmitterSubject = PublishProcessor.create();
_disposable =
_resultEmitterSubject.subscribe(
aFloat -> {
_result.setText(String.valueOf(aFloat));
});
onNumberChanged();
_number2.requestFocus();
return layout;
}
SyncManagerPresenter(
D2 d2,
SchedulerProvider schedulerProvider,
GatewayValidator gatewayValidator,
PreferenceProvider preferenceProvider,
WorkManagerController workManagerController,
SettingsRepository settingsRepository,
SyncManagerContracts.View view,
AnalyticsHelper analyticsHelper) {
this.view = view;
this.d2 = d2;
this.settingsRepository = settingsRepository;
this.schedulerProvider = schedulerProvider;
this.preferenceProvider = preferenceProvider;
this.gatewayValidator = gatewayValidator;
this.workManagerController = workManagerController;
this.analyticsHelper = analyticsHelper;
checkData = PublishProcessor.create();
compositeDisposable = new CompositeDisposable();
}
@Override
public void onStart() {
super.onStart();
publishProcessor = PublishProcessor.create();
disposable =
publishProcessor
.startWith(getConnectivityStatus(getActivity()))
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
online -> {
if (online) {
log("You are online");
} else {
log("You are offline");
}
});
listenToNetworkConnectivity();
}
@Test
public void convertsFromPublisher() {
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
}
@Setup
public void setup(Blackhole bh) {
unbounded = PublishProcessor.create();
unbounded.subscribe(new PerfConsumer(bh));
bounded = PublishProcessor.create();
bounded.subscribe(new PerfBoundedSubscriber(bh, 1000 * 1000));
subject = PublishSubject.create();
subject.subscribe(new PerfConsumer(bh));
}
@Inject
public ObservationListViewModel(ObservationRepository observationRepository) {
this.observationRepository = observationRepository;
observationListRequests = PublishProcessor.create();
observationList =
LiveDataReactiveStreams.fromPublisher(
observationListRequests
.doOnNext(__ -> loadingSpinnerVisibility.set(View.VISIBLE))
.switchMapSingle(this::getObservations)
.doOnNext(__ -> loadingSpinnerVisibility.set(View.GONE)));
}
@Before
public void init() throws IOException {
this.mockWeb3j = mock(Web3j.class);
mockNewHeadsNotification = mock(NewHeadsNotification.class);
mockEventStoreService = mock(EventStoreService.class);
when(mockNewHeadsNotification.getParams()).thenReturn(new NewHeadNotificationParameter());
mockNewHead = mock(NewHead.class);
when(mockNewHead.getHash()).thenReturn(BLOCK_HASH);
blockPublishProcessor = PublishProcessor.create();
when(mockWeb3j.newHeadsNotifications()).thenReturn(blockPublishProcessor);
mockEthBlock = mock(EthBlock.class);
final EthBlock.Block mockBlock = mock(EthBlock.Block.class);
when(mockBlock.getNumber()).thenReturn(BLOCK_NUMBER);
when(mockBlock.getHash()).thenReturn(BLOCK_HASH);
when(mockBlock.getTimestamp()).thenReturn(Numeric.toBigInt(BLOCK_TIMESTAMP));
when(mockEthBlock.getBlock()).thenReturn(mockBlock);
final Request<?, EthBlock> mockRequest = mock(Request.class);
doReturn(mockRequest).when(mockWeb3j).ethGetBlockByHash(BLOCK_HASH, true);
when(mockRequest.send()).thenReturn(mockEthBlock);
underTest = new PubSubBlockSubscriptionStrategy(mockWeb3j, NODE_NAME,
mockEventStoreService, new DummyAsyncTaskService());
}
public FormAdapter(FragmentManager fm, Context context, SearchTEContractsModule.Presenter presenter) {
setHasStableIds(true);
this.processor = PublishProcessor.create();
this.processorOptionSet = PublishProcessor.create();
this.context = context;
this.presenter = presenter;
LayoutInflater layoutInflater = LayoutInflater.from(context);
attributeList = new ArrayList<>();
rows = new ArrayList<>();
rows.add(EDITTEXT, new EditTextRow(layoutInflater, processor, false, false));
rows.add(BUTTON, new FileRow(layoutInflater, processor, false));
rows.add(CHECKBOX, new RadioButtonRow(layoutInflater, processor, false));
rows.add(SPINNER, new SpinnerRow(layoutInflater, processor, processorOptionSet, false));
rows.add(COORDINATES, new CoordinateRow(layoutInflater, processor, false, FeatureType.POINT));
rows.add(TIME, new DateTimeRow(layoutInflater, processor, TIME, false));
rows.add(DATE, new DateTimeRow(layoutInflater, processor, DATE, false));
rows.add(DATETIME, new DateTimeRow(layoutInflater, processor, DATETIME, false));
rows.add(AGEVIEW, new AgeRow(layoutInflater, processor, false));
rows.add(YES_NO, new RadioButtonRow(layoutInflater, processor, false));
rows.add(ORG_UNIT, new OrgUnitRow(fm, layoutInflater, processor, false));
rows.add(IMAGE, new ImageRow(layoutInflater, processor, null));
rows.add(UNSUPPORTED, new UnsupportedRow(layoutInflater));
rows.add(LONG_TEXT, new EditTextRow(layoutInflater, processor, false, true));
rows.add(SCAN_CODE, new ScanTextRow(layoutInflater, processor, false));
}
@Test
public void test() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("End")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
}
@Test
public void shouldAssertThatNoExpectedValuesLeft() {
Queue<String> expectedValues = new LinkedList<String>();
expectedValues.add("1");
expectedValues.add("2");
expectedValues.add("3");
final PublishProcessor<String> publishProcessor = PublishProcessor.create();
AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
@NonNull
@Override
public Disposable subscribe() {
return publishProcessor
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
onNextObtained(s);
}
});
}
};
Disposable disposable = emissionChecker.subscribe();
publishProcessor.onNext("1");
// "1"
emissionChecker.awaitNextExpectedValue();
publishProcessor.onNext("2");
// "2"
emissionChecker.awaitNextExpectedValue();
publishProcessor.onNext("3");
// "3"
emissionChecker.awaitNextExpectedValue();
// Should not throw exception
emissionChecker.assertThatNoExpectedValuesLeft();
disposable.dispose();
}
@Setup(Level.Iteration)
public void setup() {
ps = PublishProcessor.create();
}
@Test
public void shouldStoreItemsInQueueAndThenAwaitNextExpectedValues() {
final Queue<String> expectedValues = new LinkedList<String>();
expectedValues.add("1");
expectedValues.add("2");
expectedValues.add("3");
final PublishProcessor<String> publishProcessor = PublishProcessor.create();
final AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
@NonNull
@Override
public Disposable subscribe() {
return publishProcessor
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
onNextObtained(s);
}
});
}
};
final Disposable disposable = emissionChecker.subscribe();
// Notice: We emit several values before awaiting any of them
publishProcessor.onNext("1");
publishProcessor.onNext("2");
publishProcessor.onNext("3");
// Now we should successfully await all these items one by one
emissionChecker.awaitNextExpectedValue();
emissionChecker.awaitNextExpectedValue();
emissionChecker.awaitNextExpectedValue();
emissionChecker.assertThatNoExpectedValuesLeft();
disposable.dispose();
}
private static void genericProcessorPrxoy() {
final int BUFF = 64;
system = ActorSystem.create("Example");
materializer = ActorMaterializer.create(
ActorMaterializerSettings.create(system)
.withDebugLogging(true)
.withAutoFusing(true)
.withInputBuffer(BUFF, BUFF)
,
system);
AkkaProcProxy demo = new AkkaProcProxy(PublishProcessor.create(),
Roxy.TePolicy.WRAP, materializer);
demo.pub().subscribe(new Subscriber() {
Subscription s;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe");
s = subscription;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable t) {
System.out.println("onError: ");
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
demo.addUpstream(Source.range(201, 2000)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), materializer));
new Thread(() -> {
try {
Thread.sleep(333);
demo.emit("AAA");
demo.emit(5);
demo.emit(5);
demo.addUpstream(Flowable.range(1, 100));
demo.emit(5);
demo.emit(5);
demo.emit(5);
demo.emit(5);
Thread.sleep(3333);
demo.complete();
system.terminate();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//system.terminate();
}
@Override
public void setup() throws Exception {
super.setup();
harvesterFactory = new Harvester.Factory();
publisher = PublishProcessor.create();
}
@Test
public void shouldThrowExceptionBecauseFlowableEmittedUnexpectedItemAfterExpectedSequence() {
List<Throwable> errors = TestHelper.trackPluginErrors();
final Queue<String> expectedValues = new LinkedList<String>();
expectedValues.add("1");
expectedValues.add("2");
expectedValues.add("3");
final PublishProcessor<String> publishProcessor = PublishProcessor.create();
final AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
@NonNull
@Override
public Disposable subscribe() {
return publishProcessor
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
onNextObtained(s);
}
});
}
};
final Disposable disposable = emissionChecker.subscribe();
publishProcessor.onNext("1");
publishProcessor.onNext("2");
publishProcessor.onNext("3");
emissionChecker.awaitNextExpectedValue();
emissionChecker.awaitNextExpectedValue();
emissionChecker.awaitNextExpectedValue();
emissionChecker.assertThatNoExpectedValuesLeft();
assertThat(errors).isEmpty();
publishProcessor.onNext("4");
assertThat(errors).hasSize(1);
assertThat(errors.get(0).getCause())
.hasMessage("Received emission, but no more emissions were expected: obtained 4, expectedValues = [], obtainedValues = []");
disposable.dispose();
}