下面列出了io.reactivex.subjects.PublishSubject#onComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test public void shouldAutoResubscribeAfterUnlock() throws InterruptedException {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
group.lock();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
sourceObservable.onNext("Chespirito");
sourceObservable.onComplete();
group.unlock();
testObserver.assertComplete();
testObserver.assertNoErrors();
testObserver.assertValue("Chespirito");
assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
/**
* The same observable tag can be used so long as it is associated with a different observer tag.
*/
@Test public void shouldNotReplaceObservableOfSameTagAndSameGroupIdAndDifferentObservers() {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> observable1 = PublishSubject.create();
TestObserver<String> observer1 = new TestObserver<>();
TestObserver<String> observer2 = new TestObserver<>();
String sharedObservableTag = "sharedTag";
observable1.compose(group.transform(observer1, sharedObservableTag)).subscribe(observer1);
observable1.compose(group.transform(observer2, sharedObservableTag)).subscribe(observer2);
assertThat(group.subscription(observer1, sharedObservableTag).isCancelled()).isFalse();
assertThat(group.hasObservables(observer1)).isTrue();
assertThat(group.subscription(observer2, sharedObservableTag).isCancelled()).isFalse();
assertThat(group.hasObservables(observer2)).isTrue();
observable1.onNext("Hello World 1");
observable1.onComplete();
observer2.assertComplete();
observer2.assertValue("Hello World 1");
observer1.assertComplete();
observer1.assertValue("Hello World 1");
}
@Test public void shouldQueueMultipleRequests() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> subject1 = PublishSubject.create();
TestObserver<String> testSubscriber1 = new TestObserver<>();
PublishSubject<String> subject2 = PublishSubject.create();
TestObserver<String> testSubscriber2 = new TestObserver<>();
subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
subject2.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);
group.dispose();
subject1.onNext("Chespirito");
subject1.onComplete();
subject2.onNext("Edgar Vivar");
subject2.onComplete();
testSubscriber1.assertNotComplete();
testSubscriber2.assertNotComplete();
assertThat(group.hasObservables(testSubscriber1)).isEqualTo(true);
assertThat(group.hasObservables(testSubscriber2)).isEqualTo(true);
}
@Test public void shouldReplaceObservablesOfSameTagAndSameGroupId() {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> observable1 = PublishSubject.create();
PublishSubject<String> observable2 = PublishSubject.create();
TestAutoResubscribingObserver observer1 = new TestAutoResubscribingObserver("foo");
TestAutoResubscribingObserver observer2 = new TestAutoResubscribingObserver("foo");
observable1.compose(group.transform(observer1)).subscribe(observer1);
observable2.compose(group.transform(observer2)).subscribe(observer2);
assertThat(group.subscription(fooObserver).isCancelled()).isFalse();
assertThat(group.hasObservables(fooObserver)).isTrue();
observable1.onNext("Hello World 1");
observable1.onComplete();
observable2.onNext("Hello World 2");
observable2.onComplete();
observer2.assertionTarget.awaitTerminalEvent();
observer2.assertionTarget.assertComplete();
observer2.assertionTarget.assertValue("Hello World 2");
observer1.assertionTarget.assertNoValues();
}
@Test public void testUnsubscribeWhenLocked() {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
group.lock();
group.dispose();
sourceObservable.onNext("Chespirito");
sourceObservable.onComplete();
group.unlock();
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
*/
connectableObservable.subscribe(getSecondObserver());
}
@Test public void testUnsubscribe() {
TestObserver<String> observer = new TestObserver<>();
PublishSubject<String> subject = PublishSubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
proxy.dispose();
subject.onNext("Avanti!");
subject.onComplete();
assertThat(proxy.isDisposed()).isEqualTo(true);
observer.awaitTerminalEvent(10, TimeUnit.MILLISECONDS);
observer.assertNotComplete();
observer.assertNoValues();
}
@Test public void multipleSubscribesForSameObserverShouldBeIgnored() {
TestObserver<String> observer = new TestObserver<>();
PublishSubject<String> subject = PublishSubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
proxy.subscribe(observer);
proxy.dispose();
subject.onNext("Avanti!");
subject.onComplete();
assertThat(proxy.isDisposed()).isEqualTo(true);
observer.awaitTerminalEvent(10, TimeUnit.MILLISECONDS);
observer.assertNotComplete();
observer.assertNoValues();
}
void onRequestPermissionsResult(String permissions[], int[] grantResults, boolean[] shouldShowRequestPermissionRationale) {
for (int i = 0, size = permissions.length; i < size; i++) {
log("onRequestPermissionsResult " + permissions[i]);
// Find the corresponding subject
PublishSubject<Permission> subject = mSubjects.get(permissions[i]);
if (subject == null) {
// No subject found
Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request.");
return;
}
mSubjects.remove(permissions[i]);
boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED;
subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i]));
subject.onComplete();
}
}
@Test public void shouldDeliverQueuedEventsWhenResubscribed() throws Exception {
ObservableGroup group = observableManager.newGroup();
TestAutoResubscribingObserver resubscribingObserver = new TestAutoResubscribingObserver("foo");
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(resubscribingObserver))
.subscribe(resubscribingObserver);
group.dispose();
sourceObservable.onNext("Hello World");
sourceObservable.onComplete();
resubscribingObserver.assertionTarget.assertNotComplete();
resubscribingObserver.assertionTarget.assertNoValues();
// TestObserver cannot be reused after being disposed in RxJava2
resubscribingObserver = new TestAutoResubscribingObserver("foo");
group.observable(resubscribingObserver).subscribe(resubscribingObserver);
resubscribingObserver.assertionTarget.assertComplete();
resubscribingObserver.assertionTarget.assertValue("Hello World");
assertThat(group.hasObservables(resubscribingObserver)).isEqualTo(false);
}
@Test public void shouldOverrideExistingSubscriber() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> sourceObservable = PublishSubject.create();
TestAutoResubscribingObserver testSubscriber1 = new TestAutoResubscribingObserver("foo");
TestAutoResubscribingObserver testSubscriber2 = new TestAutoResubscribingObserver("foo");
sourceObservable.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
sourceObservable.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);
sourceObservable.onNext("Ruben Aguirre");
sourceObservable.onComplete();
testSubscriber1.assertionTarget.assertNotComplete();
testSubscriber1.assertionTarget.assertNoValues();
testSubscriber2.assertionTarget.assertComplete();
testSubscriber2.assertionTarget.assertValue("Ruben Aguirre");
}
void onRequestPermissionsResult(String permissions[], int[] grantResults, boolean[] shouldShowRequestPermissionRationale) {
for (int i = 0, size = permissions.length; i < size; i++) {
log("onRequestPermissionsResult " + permissions[i]);
// 找到相应的subject
PublishSubject<Permission> subject = mSubjects.get(permissions[i]);
if (subject == null) {
Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request.");
return;
}
mSubjects.remove(permissions[i]);
boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED;
subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i]));
subject.onComplete();
}
}
@Override
public void onReceive(Context context, Intent intent) {
long id = intent.getLongExtra(DownloadManager.EXTRA_DOWNLOAD_ID, 0L);
PublishSubject<String> publishSubject = subjectMap.get(id);
if (publishSubject == null)
return;
DownloadManager.Query query = new DownloadManager.Query();
query.setFilterById(id);
DownloadManager downloadManager = getDownloadManager();
Cursor cursor = downloadManager.query(query);
if (!cursor.moveToFirst()) {
cursor.close();
downloadManager.remove(id);
publishSubject.onError(new IllegalStateException("Cursor empty, this shouldn't happened"));
subjectMap.remove(id);
return;
}
int statusIndex = cursor.getColumnIndex(DownloadManager.COLUMN_STATUS);
if (DownloadManager.STATUS_SUCCESSFUL != cursor.getInt(statusIndex)) {
cursor.close();
downloadManager.remove(id);
publishSubject.onError(new IllegalStateException("Download Failed"));
subjectMap.remove(id);
return;
}
int uriIndex = cursor.getColumnIndex(DownloadManager.COLUMN_LOCAL_URI);
String downloadedPackageUriString = cursor.getString(uriIndex);
cursor.close();
publishSubject.onNext(downloadedPackageUriString);
publishSubject.onComplete();
subjectMap.remove(id);
}
@Test public void shouldClearQueuedResults() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> sourceObservable = PublishSubject.create();
TestObserver<String> subscriber1 = new TestObserver<>();
sourceObservable.compose(group.transform(subscriber1)).subscribe(subscriber1);
group.dispose();
sourceObservable.onNext("Hello");
sourceObservable.onComplete();
observableManager.destroy(group);
assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
@Test public void shouldRemoveObservablesAfterTermination() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> sourceObservable = PublishSubject.create();
TestObserver<String> subscriber = new TestObserver<>();
sourceObservable.compose(group.transform(subscriber)).subscribe(subscriber);
sourceObservable.onNext("Roberto Gomez Bolanos is king");
sourceObservable.onComplete();
subscriber.assertComplete();
assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
@Test public void shouldNotDeliverResultWhileUnsubscribed() throws Exception {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
group.dispose();
sourceObservable.onNext("Roberto Gomez Bolanos");
sourceObservable.onComplete();
testObserver.assertNotComplete();
assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
@Test
void replay_PublishSubject_test() {
PublishSubject<Object> publishSubject = PublishSubject.create();
ConnectableObservable<Object> replay = publishSubject.replay();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
Disposable subscribe1 = replay.subscribe(x -> {
log("一郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe2 = replay.subscribe(x -> {
log("二郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe3 = replay.subscribe(x -> {
log("三郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
AtomicInteger atomicInteger = new AtomicInteger(integers.size());
try {
forkJoinPool.submit(() -> {
integers.forEach(id -> {
sleep(1,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
});
});
/* integers.forEach(id -> forkJoinPool.submit(() -> {
sleep(3,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
}));*/
replay.connect();
sleep(2,TimeUnit.SECONDS);
subscribe1.dispose();
sleep(1,TimeUnit.SECONDS);
//replay.connect(consumer -> consumer.dispose());
publishSubject.onComplete();
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 and onComplete
source.onNext(1);
source.onNext(2);
source.onNext(3);
/*
* it will emit 4 and onComplete for second observer also.
*/
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete();
}
@Test
public void test() {
PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler sch = new TestScheduler();
ps.compose(bufferDebounce(200, TimeUnit.MILLISECONDS, sch))
.subscribe(
v -> System.out.println(sch.now(TimeUnit.MILLISECONDS)+ ": " + v),
Throwable::printStackTrace,
() -> System.out.println("Done"));
ps.onNext(1);
ps.onNext(2);
sch.advanceTimeTo(100, TimeUnit.MILLISECONDS);
ps.onNext(3);
sch.advanceTimeTo(150, TimeUnit.MILLISECONDS);
ps.onNext(4);
sch.advanceTimeTo(400, TimeUnit.MILLISECONDS);
ps.onNext(5);
sch.advanceTimeTo(450, TimeUnit.MILLISECONDS);
ps.onNext(6);
sch.advanceTimeTo(800, TimeUnit.MILLISECONDS);
ps.onNext(7);
ps.onComplete();
sch.advanceTimeTo(850, TimeUnit.MILLISECONDS);
}
@Test
public void test() {
PublishSubject<String> subject = PublishSubject.create();
TestScheduler sch = new TestScheduler();
subject.compose(debounceOnly(v -> v.startsWith("A"), 100, TimeUnit.MILLISECONDS, sch))
.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
subject.onNext("A1");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
subject.onNext("B1");
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
subject.onNext("C1");
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
subject.onNext("A2");
sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);
subject.onNext("A3");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
subject.onNext("A4");
sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);
subject.onNext("B2");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
subject.onNext("C2");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
subject.onComplete();
}