下面列出了io.reactivex.subjects.PublishSubject#onNext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test public void shouldOverrideExistingSubscriber() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> sourceObservable = PublishSubject.create();
TestAutoResubscribingObserver testSubscriber1 = new TestAutoResubscribingObserver("foo");
TestAutoResubscribingObserver testSubscriber2 = new TestAutoResubscribingObserver("foo");
sourceObservable.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
sourceObservable.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);
sourceObservable.onNext("Ruben Aguirre");
sourceObservable.onComplete();
testSubscriber1.assertionTarget.assertNotComplete();
testSubscriber1.assertionTarget.assertNoValues();
testSubscriber2.assertionTarget.assertComplete();
testSubscriber2.assertionTarget.assertValue("Ruben Aguirre");
}
@Test public void shouldReplaceObservablesOfSameTagAndSameGroupId() {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> observable1 = PublishSubject.create();
PublishSubject<String> observable2 = PublishSubject.create();
TestAutoResubscribingObserver observer1 = new TestAutoResubscribingObserver("foo");
TestAutoResubscribingObserver observer2 = new TestAutoResubscribingObserver("foo");
observable1.compose(group.transform(observer1)).subscribe(observer1);
observable2.compose(group.transform(observer2)).subscribe(observer2);
assertThat(group.subscription(fooObserver).isCancelled()).isFalse();
assertThat(group.hasObservables(fooObserver)).isTrue();
observable1.onNext("Hello World 1");
observable1.onComplete();
observable2.onNext("Hello World 2");
observable2.onComplete();
observer2.assertionTarget.awaitTerminalEvent();
observer2.assertionTarget.assertComplete();
observer2.assertionTarget.assertValue("Hello World 2");
observer1.assertionTarget.assertNoValues();
}
@Test
public void disposePreventsFurtherEvents() throws Exception {
PublishSubject<Integer> subject = PublishSubject.create();
EventSource<Integer> source = RxEventSources.fromObservables(subject);
RecordingConsumer<Integer> consumer = new RecordingConsumer<>();
Disposable d = source.subscribe(consumer);
subject.onNext(1);
subject.onNext(2);
d.dispose();
subject.onNext(3);
consumer.waitForChange(50);
consumer.assertValues(1, 2);
}
@Test public void shouldQueueMultipleRequests() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> subject1 = PublishSubject.create();
TestObserver<String> testSubscriber1 = new TestObserver<>();
PublishSubject<String> subject2 = PublishSubject.create();
TestObserver<String> testSubscriber2 = new TestObserver<>();
subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
subject2.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);
group.dispose();
subject1.onNext("Chespirito");
subject1.onComplete();
subject2.onNext("Edgar Vivar");
subject2.onComplete();
testSubscriber1.assertNotComplete();
testSubscriber2.assertNotComplete();
assertThat(group.hasObservables(testSubscriber1)).isEqualTo(true);
assertThat(group.hasObservables(testSubscriber2)).isEqualTo(true);
}
@Test public void shouldNotDeliverEventsWhenResubscribedIfLocked() {
ObservableGroup group = observableManager.newGroup();
TestAutoResubscribingObserver testObserver = new TestAutoResubscribingObserver("foo");
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
group.dispose();
sourceObservable.onNext("Hello World");
sourceObservable.onComplete();
group.lock();
testObserver = new TestAutoResubscribingObserver("foo");
group.observable(testObserver).subscribe(testObserver);
testObserver.assertionTarget.assertNotComplete();
testObserver.assertionTarget.assertNoValues();
group.unlock();
testObserver.assertionTarget.assertComplete();
testObserver.assertionTarget.assertNoErrors();
testObserver.assertionTarget.assertValue("Hello World");
assertThat(group.hasObservables(testObserver)).isEqualTo(false);
}
@Test
@TargetApi(Build.VERSION_CODES.M)
public void eachSubscriptionCombined_trigger_granted() {
TestObserver<Permission> sub = new TestObserver<>();
String permission = Manifest.permission.READ_PHONE_STATE;
when(mRxPermissions.isGranted(permission)).thenReturn(false);
int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
PublishSubject<Object> trigger = PublishSubject.create();
trigger.compose(mRxPermissions.ensureEachCombined(permission)).subscribe(sub);
trigger.onNext(1);
mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);
sub.assertNoErrors();
sub.assertNotTerminated();
sub.assertValue(new Permission(permission, true));
}
@Test
public void testDeliverLatestToView_SingleItemViewComesAndGoes() throws Exception {
mPresenter.create();
PublishSubject<Integer> source = PublishSubject.create();
TestObserver<Integer> testObserver = new TestObserver<>();
source
.compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
.subscribe(testObserver);
source.onNext(1);
source.onNext(2);
mPresenter.attachView(mView);
mPresenter.detachView();
mPresenter.attachView(mView);
mPresenter.detachView();
mPresenter.attachView(mView);
testObserver.assertNotComplete();
testObserver.assertNoErrors();
testObserver.assertValuesOnly(2, 2, 2);
}
@Test
public void innerCancelled3() {
PublishSubject<Integer> pp1 = PublishSubject.create();
PublishSubject<Integer> pp2 = PublishSubject.create();
pp1
.flatMap(v -> pp2)
.test();
pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasObservers());
pp1.onError(new Exception());
assertFalse("Has subscribers?", pp2.hasObservers());
}
@Override
public void onReceive(Context context, Intent intent) {
long id = intent.getLongExtra(DownloadManager.EXTRA_DOWNLOAD_ID, 0L);
PublishSubject<String> publishSubject = subjectMap.get(id);
if (publishSubject == null)
return;
DownloadManager.Query query = new DownloadManager.Query();
query.setFilterById(id);
DownloadManager downloadManager = getDownloadManager();
Cursor cursor = downloadManager.query(query);
if (!cursor.moveToFirst()) {
cursor.close();
downloadManager.remove(id);
publishSubject.onError(new IllegalStateException("Cursor empty, this shouldn't happened"));
subjectMap.remove(id);
return;
}
int statusIndex = cursor.getColumnIndex(DownloadManager.COLUMN_STATUS);
if (DownloadManager.STATUS_SUCCESSFUL != cursor.getInt(statusIndex)) {
cursor.close();
downloadManager.remove(id);
publishSubject.onError(new IllegalStateException("Download Failed"));
subjectMap.remove(id);
return;
}
int uriIndex = cursor.getColumnIndex(DownloadManager.COLUMN_LOCAL_URI);
String downloadedPackageUriString = cursor.getString(uriIndex);
cursor.close();
publishSubject.onNext(downloadedPackageUriString);
publishSubject.onComplete();
subjectMap.remove(id);
}
@Test
public void effectPerformerInvokesConsumerAndPassesTheRequestedEffect() throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestConsumer<String> consumer = new TestConsumer<>();
upstream.compose(Transformers.fromConsumer(consumer)).subscribe();
upstream.onNext("First Time");
assertThat(consumer.getCurrentValue(), is("First Time"));
upstream.onNext("Do it again!");
assertThat(consumer.getCurrentValue(), is("Do it again!"));
}
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestConsumer<String> consumer = new TestConsumer<>();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(consumer.getCurrentValue(), is(equalTo(null)));
scheduler.triggerActions();
assertThat(consumer.getCurrentValue(), is("First Time"));
}
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndEmitsReturnedEvents() {
PublishSubject<String> upstream = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
Function<String, Integer> function = s -> s.length();
TestObserver<Integer> observer =
upstream.compose(Transformers.fromFunction(function, scheduler)).test();
upstream.onNext("Hello");
scheduler.triggerActions();
observer.assertValue(5);
}
@Test public void shouldNotDeliverResultWhileLocked() throws Exception {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
group.lock();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
sourceObservable.onNext("Chespirito");
sourceObservable.onComplete();
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
@Test
public void test() {
PublishSubject<Integer> source = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
source.compose(debounceTime(10, TimeUnit.MILLISECONDS, scheduler, v -> {
System.out.println(
"Dropped: " + v + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS));
}))
.subscribe(v -> System.out.println(
"Passed: " + v + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS)),
Throwable::printStackTrace,
() -> System.out.println(
"Done " + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS)));
source.onNext(1);
scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
source.onNext(2);
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
source.onNext(3);
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
source.onNext(4);
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
source.onNext(5);
scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
source.onNext(6);
scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
source.onComplete();
}
private void demo4() {
PublishSubject<Integer> observable = PublishSubject.create();
observable.toFlowable(BackpressureStrategy.MISSING)
.buffer(10)
.observeOn(Schedulers.computation())
.subscribe(v -> log("s", v.toString()), this::log);
for (int i = 0; i < 1000000; i++) {
observable.onNext(i);
}
}
@Test public void shouldClearQueuedResults() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> sourceObservable = PublishSubject.create();
TestObserver<String> subscriber1 = new TestObserver<>();
sourceObservable.compose(group.transform(subscriber1)).subscribe(subscriber1);
group.dispose();
sourceObservable.onNext("Hello");
sourceObservable.onComplete();
observableManager.destroy(group);
assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
@Test public void shouldNotDeliverResultWhileUnsubscribed() throws Exception {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
group.dispose();
sourceObservable.onNext("Roberto Gomez Bolanos");
sourceObservable.onComplete();
testObserver.assertNotComplete();
assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
@Test
public void testRequestOverflow() {
PublishSubject<Integer> subject = PublishSubject.create();
TestSubscriber<Integer> sub = subject.toFlowable(BackpressureStrategy.BUFFER) //
.to(Transformers.reduce(reducer, 2, 5)) //
.test(Long.MAX_VALUE - 2) //
.requestMore(Long.MAX_VALUE - 2);
subject.onNext(1);
subject.onNext(2);
subject.onComplete();
sub.assertValues(3);
}
public static <I, O> Connectable<I, O> fromTransformer(
final ObservableTransformer<I, O> transformer) {
checkNotNull(transformer);
Connectable<I, O> actualConnectable =
new Connectable<I, O>() {
@Nonnull
@Override
public Connection<I> connect(final Consumer<O> output) {
final PublishSubject<I> subject = PublishSubject.create();
final Disposable disposable =
subject
.compose(transformer)
.subscribe(
new io.reactivex.functions.Consumer<O>() {
@Override
public void accept(O e) {
output.accept(e);
}
},
new io.reactivex.functions.Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
RxJavaPlugins.onError(throwable);
}
},
new Action() {
@Override
public void run() throws Exception {
// TODO: complain loudly! shouldn't ever complete
}
});
return new Connection<I>() {
public void accept(I effect) {
subject.onNext(effect);
}
@Override
public void dispose() {
disposable.dispose();
}
};
}
};
return new DiscardAfterDisposeConnectable<>(actualConnectable);
}
@Test
public void test() {
PublishSubject<String> subject = PublishSubject.create();
TestScheduler sch = new TestScheduler();
subject.compose(debounceOnly(v -> v.startsWith("A"), 100, TimeUnit.MILLISECONDS, sch))
.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
subject.onNext("A1");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
subject.onNext("B1");
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
subject.onNext("C1");
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
subject.onNext("A2");
sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);
subject.onNext("A3");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
subject.onNext("A4");
sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);
subject.onNext("B2");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
subject.onNext("C2");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
subject.onComplete();
}