下面列出了怎么用io.reactivex.subjects.PublishSubject的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
@TargetApi(Build.VERSION_CODES.M)
public void subscription_trigger_granted() {
TestObserver<Boolean> sub = new TestObserver<>();
String permission = Manifest.permission.READ_PHONE_STATE;
when(mRxPermissions.isGranted(permission)).thenReturn(false);
int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
PublishSubject<Object> trigger = PublishSubject.create();
trigger.compose(mRxPermissions.ensure(permission)).subscribe(sub);
trigger.onNext(1);
mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);
sub.assertNoErrors();
sub.assertNotTerminated();
sub.assertValue(true);
}
@Test
public void testDeliverLatestToView_ViewComesAndGoes() throws Exception {
mPresenter.create();
PublishSubject<Integer> source = PublishSubject.create();
TestObserver<Integer> testObserver = new TestObserver<>();
source
.compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
.subscribe(testObserver);
source.onNext(1);
source.onNext(2);
mPresenter.attachView(mView);
source.onNext(3);
mPresenter.detachView();
source.onNext(4);
source.onNext(5);
mPresenter.attachView(mView);
source.onNext(6);
testObserver.assertNotComplete();
testObserver.assertNoErrors();
testObserver.assertValuesOnly(2, 3, 5, 6);
}
@Test
public void effectHandlersShouldBeImmutable() throws Exception {
// redo some test setup for test case specific conditions
publishSubject = PublishSubject.create();
testSubscriber = TestObserver.create();
SubtypeEffectHandlerBuilder<TestEffect, TestEvent> builder =
RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
.addTransformer(A.class, (Observable<A> as) -> as.map(a -> AEvent.create(a.id())));
ObservableTransformer<TestEffect, TestEvent> router = builder.build();
// this should not lead to the effects router being capable of handling B effects
builder.addTransformer(B.class, bs -> bs.map(b -> BEvent.create(b.id())));
publishSubject.compose(router).subscribe(testSubscriber);
B effect = B.create(84);
publishSubject.onNext(effect);
publishSubject.onComplete();
testSubscriber.awaitTerminalEvent();
testSubscriber.assertError(new UnknownEffectException(effect));
}
@Test public void testNonResubscribableObservablesRemovedAfterNonFinishingDestroy() {
when(observableManager.newGroup()).thenReturn(new ObservableGroup(1));
GroupLifecycleManager lifecycleManager = GroupLifecycleManager.onCreate
(observableManager, null, target);
TestObserver<String> nonResubscribableObserver = new TestObserver<>();
PublishSubject.<String>create()
.compose(lifecycleManager.group().transform(nonResubscribableObserver))
.subscribe(nonResubscribableObserver);
assertThat(lifecycleManager.group().hasObservables(nonResubscribableObserver)).isTrue();
//Simulate a rotation
Activity activity = mock(Activity.class);
when(activity.isFinishing()).thenReturn(false);
lifecycleManager.onSaveInstanceState(new Bundle());
lifecycleManager.onDestroy(activity);
assertThat(lifecycleManager.group().hasObservables(nonResubscribableObserver)).isFalse();
}
private void startListenBalance(final PublishSubject<Object> subject){
Disposable subscribe =
api
.updateBalance()
.repeatWhen((completed) -> completed.delay(
BuildConfig.UPDATE_BALANCE_SECONDS_DELAY,
TimeUnit.SECONDS
))
.retryWhen((completed) -> completed.delay(
BuildConfig.UPDATE_BALANCE_SECONDS_DELAY,
TimeUnit.SECONDS
))
.repeatWhen(repeatHandler ->
repeatHandler.flatMap(nothing -> subject.toFlowable(BackpressureStrategy.LATEST)))
.subscribe();
compositeDisposable.add(subscribe);
}
private void initObservable() {
subject = PublishSubject.create();
subject.observeOn(EventThread.getScheduler(thread))
.subscribe(new Consumer() {
@Override
public void accept(Object event) {
try {
if (valid) {
handleEvent(event);
}
} catch (InvocationTargetException e) {
throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberEvent.this, e);
}
}
});
}
@Override
public Observable<RxResult> registerListener(FragmentActivity activity) {
submitResults = PublishSubject.create();
registerActivityLifeCycle(activity);
mSyncAuthClient.registerCallbackIfInterrupt(activity, (result, type) -> {
switch (type) {
case SIGN_IN:
processLogInResult(result);
break;
case SIGN_OUT:
processSignOutResult(result);
break;
default:
break;
}
}, Executors.newSingleThreadExecutor());
return submitResults;
}
@Test public void shouldDeliverQueuedEventsWhenResubscribed() throws Exception {
ObservableGroup group = observableManager.newGroup();
TestAutoResubscribingObserver resubscribingObserver = new TestAutoResubscribingObserver("foo");
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(resubscribingObserver))
.subscribe(resubscribingObserver);
group.dispose();
sourceObservable.onNext("Hello World");
sourceObservable.onComplete();
resubscribingObserver.assertionTarget.assertNotComplete();
resubscribingObserver.assertionTarget.assertNoValues();
// TestObserver cannot be reused after being disposed in RxJava2
resubscribingObserver = new TestAutoResubscribingObserver("foo");
group.observable(resubscribingObserver).subscribe(resubscribingObserver);
resubscribingObserver.assertionTarget.assertComplete();
resubscribingObserver.assertionTarget.assertValue("Hello World");
assertThat(group.hasObservables(resubscribingObserver)).isEqualTo(false);
}
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
*/
connectableObservable.subscribe(getSecondObserver());
}
@Test public void shouldReplaceObservablesOfSameTagAndSameGroupId() {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> observable1 = PublishSubject.create();
PublishSubject<String> observable2 = PublishSubject.create();
TestAutoResubscribingObserver observer1 = new TestAutoResubscribingObserver("foo");
TestAutoResubscribingObserver observer2 = new TestAutoResubscribingObserver("foo");
observable1.compose(group.transform(observer1)).subscribe(observer1);
observable2.compose(group.transform(observer2)).subscribe(observer2);
assertThat(group.subscription(fooObserver).isCancelled()).isFalse();
assertThat(group.hasObservables(fooObserver)).isTrue();
observable1.onNext("Hello World 1");
observable1.onComplete();
observable2.onNext("Hello World 2");
observable2.onComplete();
observer2.assertionTarget.awaitTerminalEvent();
observer2.assertionTarget.assertComplete();
observer2.assertionTarget.assertValue("Hello World 2");
observer1.assertionTarget.assertNoValues();
}
@Test public void shouldNotRemoveSubscribersForOtherIds() throws Exception {
ObservableGroup group = observableManager.newGroup();
ObservableGroup group2 = observableManager.newGroup();
PublishSubject<String> subject1 = PublishSubject.create();
TestAutoResubscribingObserver testSubscriber1 = new TestAutoResubscribingObserver("foo");
PublishSubject<String> subject2 = PublishSubject.create();
TestAutoResubscribingObserver testSubscriber2 = new TestAutoResubscribingObserver("bar");
subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
subject2.compose(group2.transform(testSubscriber2)).subscribe(testSubscriber2);
group.dispose();
subject1.onNext("Florinda Mesa");
subject1.onComplete();
subject2.onNext("Carlos Villagran");
subject2.onComplete();
testSubscriber1.assertionTarget.assertNotComplete();
testSubscriber2.assertionTarget.assertNoErrors();
testSubscriber2.assertionTarget.assertValue("Carlos Villagran");
}
@Test public void shouldQueueMultipleRequests() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> subject1 = PublishSubject.create();
TestObserver<String> testSubscriber1 = new TestObserver<>();
PublishSubject<String> subject2 = PublishSubject.create();
TestObserver<String> testSubscriber2 = new TestObserver<>();
subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
subject2.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);
group.dispose();
subject1.onNext("Chespirito");
subject1.onComplete();
subject2.onNext("Edgar Vivar");
subject2.onComplete();
testSubscriber1.assertNotComplete();
testSubscriber2.assertNotComplete();
assertThat(group.hasObservables(testSubscriber1)).isEqualTo(true);
assertThat(group.hasObservables(testSubscriber2)).isEqualTo(true);
}
@Test
public void testExecuteStderrListenerIsCompletedOnceApplicationExecutionEnds() throws Throwable {
final JobExecutor jobExecutor = getInstance();
final AtomicBoolean completedCalled = new AtomicBoolean(false);
final Subject<byte[]> stderrSubject = PublishSubject.create();
stderrSubject.doOnComplete(() -> completedCalled.set(true)).subscribe();
final JobEventListeners listeners = createStderrListener(stderrSubject);
final CancelablePromise<JobExecutionResult> ret =
jobExecutor.execute(STANDARD_REQUEST, listeners);
promiseAssert(ret, result -> {
try {
// The stderr thread can race with the exit thread
Thread.sleep(50);
assertThat(completedCalled.get()).isTrue();
} catch (InterruptedException ignored) {}
});
}
@Test public void shouldAutoResubscribeAfterUnlock() throws InterruptedException {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
group.lock();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
sourceObservable.onNext("Chespirito");
sourceObservable.onComplete();
group.unlock();
testObserver.assertComplete();
testObserver.assertNoErrors();
testObserver.assertValue("Chespirito");
assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
@Test
public void testExecuteEvaluatesJoinAsExpected() throws InterruptedException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${join(',', inputs.someList)}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
assertThat(stringFromStdout).isEqualTo("a,b,c,d"); // From the input fixture
}
@Test
public void testGetStdoutUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException {
final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>();
final Subject<byte[]> stdoutSubject = PublishSubject.create();
final JobExecutor executor =
MockJobExecutor.thatUses(executorPromise, stdoutSubject, Observable.just(TestHelpers.generateRandomBytes()));
final JobManager jobManager = createManagerWith(executor);
final Pair<JobId, CancelablePromise<FinalizedJob>> ret =
jobManager.submit(STANDARD_VALID_REQUEST);
final Observable<byte[]> stdoutObservable =
jobManager.stdoutUpdates(ret.getLeft()).get();
final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>();
stdoutObservable.subscribe(bytesFromObservable::set);
final byte[] bytesExpected = TestHelpers.generateRandomBytes();
stdoutSubject.onNext(bytesExpected);
executorPromise.complete(new JobExecutionResult(FINISHED));
ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS);
assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected);
}
@Test
public void testGetStderrUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException {
final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>();
final Subject<byte[]> stderrSubject = PublishSubject.create();
final JobExecutor executor =
MockJobExecutor.thatUses(executorPromise, Observable.just(TestHelpers.generateRandomBytes()), stderrSubject);
final JobManager jobManager = createManagerWith(executor);
final Pair<JobId, CancelablePromise<FinalizedJob>> ret =
jobManager.submit(STANDARD_VALID_REQUEST);
final Observable<byte[]> stderrObservable =
jobManager.stderrUpdates(ret.getLeft()).get();
final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>();
stderrObservable.subscribe(bytesFromObservable::set);
final byte[] bytesExpected = TestHelpers.generateRandomBytes();
stderrSubject.onNext(bytesExpected);
executorPromise.complete(new JobExecutionResult(FINISHED));
ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS);
assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected);
}
@Override
public void onActivityResult(int requestCode, int resultCode, Intent data) {
super.onActivityResult(requestCode, resultCode, data);
//rxjava方式的处理
PublishSubject<ActivityResultInfo> subject = mSubjects.remove(requestCode);
if (subject != null) {
subject.onNext(new ActivityResultInfo(resultCode, data));
subject.onComplete();
}
//callback方式的处理
AvoidOnResult.Callback callback = mCallbacks.remove(requestCode);
if (callback != null) {
callback.onActivityResult(resultCode, data);
}
}
@Test
@TargetApi(Build.VERSION_CODES.M)
public void eachSubscription_trigger_granted() {
TestObserver<Permission> sub = new TestObserver<>();
String permission = Manifest.permission.READ_PHONE_STATE;
when(mRxPermissions.isGranted(permission)).thenReturn(false);
int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
PublishSubject<Object> trigger = PublishSubject.create();
trigger.compose(mRxPermissions.ensureEach(permission)).subscribe(sub);
trigger.onNext(1);
mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);
sub.assertNoErrors();
sub.assertNotTerminated();
sub.assertValue(new Permission(permission, true));
}
@Test
public void innerCancelled4() {
PublishSubject<Integer> pp1 = PublishSubject.create();
PublishSubject<Integer> pp2 = PublishSubject.create();
pp1
.concatMap(v -> pp2)
.test();
pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasObservers());
pp1.onError(new Exception());
assertFalse("Has subscribers?", pp2.hasObservers());
}
private void monitor(InputStream input, PublishSubject<String> subject) {
BufferedReader reader = new BufferedReader(new InputStreamReader(input, encoding));
final int BUFFERSIZE = 4096;
char[] buffer = new char[BUFFERSIZE];
while (true) {
try {
if (Thread.interrupted()) {
subject.onComplete();
return;
}
int read = reader.read(buffer, 0, BUFFERSIZE);
if (read == -1) {
subject.onComplete();
return;
}
subject.onNext(new String(buffer, 0, read));
} catch (IOException e) {
subject.onError(e);
return;
}
}
}
public StompClient(ConnectionProvider connectionProvider) {
this.connectionProvider = connectionProvider;
streamMap = new ConcurrentHashMap<>();
lifecyclePublishSubject = PublishSubject.create();
pathMatcher = new SimplePathMatcher();
heartBeatTask = new HeartBeatTask(this::sendHeartBeat, () -> {
lifecyclePublishSubject.onNext(new LifecycleEvent(LifecycleEvent.Type.FAILED_SERVER_HEARTBEAT));
});
}
@Test
public void effectPerformerRunsActionWheneverEffectIsRequested() throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestAction action = new TestAction();
upstream.compose(Transformers.fromAction(action)).subscribe();
upstream.onNext("First Time");
assertThat(action.getRunCount(), is(1));
upstream.onNext("One more!");
assertThat(action.getRunCount(), is(2));
}
@Test
public void effectPerformerRunsActionOnSchedulerWheneverEffectIsRequested() throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestAction action = new TestAction();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromAction(action, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(action.getRunCount(), is(0));
scheduler.triggerActions();
assertThat(action.getRunCount(), is(1));
}
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndErrorsForUnhandledExceptions() {
PublishSubject<String> upstream = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
Function<String, Integer> function =
s -> {
throw new RuntimeException("Something bad happened");
};
TestObserver<Integer> observer =
upstream.compose(Transformers.fromFunction(function, scheduler)).test();
upstream.onNext("Hello");
scheduler.triggerActions();
observer.assertError(RuntimeException.class);
}
@Test
public void shouldHandleNullRxJavaErrorHandler() throws Exception {
// given no RxJava error handler
RxJavaPlugins.setErrorHandler(null);
// and a router with a broken effect handler
publishSubject = PublishSubject.create();
testSubscriber = TestObserver.create();
final RuntimeException expected = new RuntimeException("expected!");
ObservableTransformer<TestEffect, TestEvent> router =
RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
.addFunction(
A.class,
a -> {
throw expected;
})
.build();
publishSubject.compose(router).subscribe(testSubscriber);
// when an event is sent, it doesn't crash (the exception does get printed to stderr)
publishSubject.onNext(A.create(1));
// and the right exception is forwarded to the test subscriber
testSubscriber.assertError(t -> t == expected);
}
@SuppressWarnings("unused")
public final <X extends AutoViewHolder, Y extends IRenderer<X>> PublishSubject<ItemInfo<Y, X>>
longClicks(@NonNull final Class<Y> clazz) {
PublishSubject<ItemInfo<Y, X>> publishSubject = PublishSubject.create();
mItemViewLongClickBinding.put(clazz, publishSubject);
return publishSubject;
}
@Test public void shouldNotDeliverResultWhileUnsubscribed() throws Exception {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
group.dispose();
sourceObservable.onNext("Roberto Gomez Bolanos");
sourceObservable.onComplete();
testObserver.assertNotComplete();
assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
@Inject
MapContainerViewModel(
ProjectRepository projectRepository,
FeatureRepository featureRepository,
LocationManager locationManager,
OfflineAreaRepository offlineAreaRepository) {
this.featureRepository = featureRepository;
this.locationManager = locationManager;
this.locationLockChangeRequests = PublishSubject.create();
this.cameraUpdateSubject = PublishSubject.create();
Flowable<BooleanOrError> locationLockStateFlowable = createLocationLockStateFlowable().share();
this.locationLockState =
LiveDataReactiveStreams.fromPublisher(
locationLockStateFlowable.startWith(BooleanOrError.falseValue()));
this.cameraUpdateRequests =
LiveDataReactiveStreams.fromPublisher(
createCameraUpdateFlowable(locationLockStateFlowable));
this.cameraPosition = new MutableLiveData<>();
this.activeProject =
LiveDataReactiveStreams.fromPublisher(projectRepository.getActiveProjectOnceAndStream());
// TODO: Clear feature markers when project is deactivated.
// TODO: Since we depend on project stream from repo anyway, this transformation can be moved
// into the repo?
this.mapPins =
LiveDataReactiveStreams.fromPublisher(
projectRepository
.getActiveProjectOnceAndStream()
.map(Loadable::value)
.switchMap(this::getFeaturesStream)
.map(MapContainerViewModel::toMapPins));
this.mbtilesFilePaths =
LiveDataReactiveStreams.fromPublisher(
offlineAreaRepository
.getDownloadedTilesOnceAndStream()
.map(set -> stream(set).map(Tile::getPath).collect(toImmutableSet())));
}
@Inject
HomeScreenViewModel(
ProjectRepository projectRepository,
FeatureRepository featureRepository,
AuthenticationManager authManager,
Navigator navigator,
Schedulers schedulers) {
this.projectRepository = projectRepository;
this.addFeatureDialogRequests = new MutableLiveData<>();
this.openDrawerRequests = new MutableLiveData<>();
this.bottomSheetState = new MutableLiveData<>();
this.activeProject =
LiveDataReactiveStreams.fromPublisher(projectRepository.getActiveProjectOnceAndStream());
this.navigator = navigator;
this.addFeatureClicks = PublishSubject.create();
disposeOnClear(
addFeatureClicks
.switchMapSingle(
newFeature ->
featureRepository
.saveFeature(newFeature, authManager.getCurrentUser())
.toSingleDefault(newFeature)
.doOnError(this::onAddFeatureError)
.onErrorResumeNext(Single.never())) // Prevent from breaking upstream.
.observeOn(schedulers.ui())
.subscribe(this::showBottomSheet));
}