下面列出了io.reactivex.subjects.PublishSubject#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test public void shouldAutoResubscribeAfterLockAndUnlock() {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
group.lock();
sourceObservable.onNext("Chespirito");
sourceObservable.onComplete();
group.unlock();
testObserver.assertTerminated();
testObserver.assertNoErrors();
testObserver.assertValue("Chespirito");
assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
@Test
public void testExecuteEvaluatesOutputDirAsExpected() throws InterruptedException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${outputDir}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
assertThat(Files.exists(Paths.get(stringFromStdout)));
}
@Test
@TargetApi(Build.VERSION_CODES.M)
public void eachSubscription_trigger_granted() {
TestObserver<Permission> sub = new TestObserver<>();
String permission = Manifest.permission.READ_PHONE_STATE;
when(mRxPermissions.isGranted(permission)).thenReturn(false);
int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
PublishSubject<Object> trigger = PublishSubject.create();
trigger.compose(mRxPermissions.ensureEach(permission)).subscribe(sub);
trigger.onNext(1);
mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);
sub.assertNoErrors();
sub.assertNotTerminated();
sub.assertValue(new Permission(permission, true));
}
@Test
@TargetApi(Build.VERSION_CODES.M)
public void subscription_trigger_granted() {
TestObserver<Boolean> sub = new TestObserver<>();
String permission = Manifest.permission.READ_PHONE_STATE;
when(mRxPermissions.isGranted(permission)).thenReturn(false);
int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
PublishSubject<Object> trigger = PublishSubject.create();
trigger.compose(mRxPermissions.ensure(permission)).subscribe(sub);
trigger.onNext(1);
mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);
sub.assertNoErrors();
sub.assertNotTerminated();
sub.assertValue(true);
}
@Test public void error() {
PublishSubject subject = PublishSubject.create();
TestObserver sub = new TestObserver<>();
subject.subscribeWith(sub);
Exception originalException = new RuntimeException("I am the original Exception");
Exception expectedException =
new IllegalStateException("View intents must not throw errors", originalException);
try {
Observable.error(originalException).subscribe(new DisposableIntentObserver(subject));
Assert.fail("Exception expected");
} catch (Exception e) {
Throwable cause = e.getCause();
Assert.assertEquals(expectedException.getMessage(), cause.getMessage());
Assert.assertEquals(originalException, cause.getCause());
}
sub.assertNotComplete().assertNoValues();
}
@Test public void shouldNotRemoveSubscribersForOtherIds() throws Exception {
ObservableGroup group = observableManager.newGroup();
ObservableGroup group2 = observableManager.newGroup();
PublishSubject<String> subject1 = PublishSubject.create();
TestAutoResubscribingObserver testSubscriber1 = new TestAutoResubscribingObserver("foo");
PublishSubject<String> subject2 = PublishSubject.create();
TestAutoResubscribingObserver testSubscriber2 = new TestAutoResubscribingObserver("bar");
subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
subject2.compose(group2.transform(testSubscriber2)).subscribe(testSubscriber2);
group.dispose();
subject1.onNext("Florinda Mesa");
subject1.onComplete();
subject2.onNext("Carlos Villagran");
subject2.onComplete();
testSubscriber1.assertionTarget.assertNotComplete();
testSubscriber2.assertionTarget.assertNoErrors();
testSubscriber2.assertionTarget.assertValue("Carlos Villagran");
}
@Test public void testDisposingObserver() {
ObservableGroup group = observableManager.newGroup();
TestObserver<String> testObserver = new TestObserver<>();
PublishSubject<String> sourceObservable = PublishSubject.create();
sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
testObserver.dispose();
sourceObservable.onNext("Chespirito");
testObserver.assertNoValues();
assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
private void demo4() {
PublishSubject<Integer> observable = PublishSubject.create();
observable.toFlowable(BackpressureStrategy.MISSING)
.buffer(10)
.observeOn(Schedulers.computation())
.subscribe(v -> log("s", v.toString()), this::log);
for (int i = 0; i < 1000000; i++) {
observable.onNext(i);
}
}
@Test public void testAddThrowsAfterDestroyed() {
ObservableGroup group = observableManager.newGroup();
Observable<String> source = PublishSubject.create();
TestObserver<String> observer = new TestObserver<>();
group.destroy();
source.compose(group.transform(observer)).subscribe(observer);
observer.assertError(IllegalStateException.class);
}
@NonNull @CheckReturnValue @TargetApi(M) Observable<Permission> requestOnM(@NonNull final String... permissions) {
final List<Observable<Permission>> list = new ArrayList<>(permissions.length);
final List<String> unrequestedPermissions = new ArrayList<>();
// In case of multiple permissions, we create an observable for each of them.
// At the end, the observables are combined to have a unique response.
for (final String permission : permissions) {
if (isGranted(permission)) {
list.add(Observable.just(Permission.granted(permission)));
} else if (isRevokedByPolicy(permission)) {
list.add(Observable.just(Permission.revokedByPolicy(permission)));
} else {
PublishSubject<Permission> subject = currentPermissionRequests.get(permission);
// Create a new subject if not exists
if (subject == null) {
unrequestedPermissions.add(permission);
subject = PublishSubject.create();
currentPermissionRequests.put(permission, subject);
}
list.add(subject);
}
}
if (!unrequestedPermissions.isEmpty()) {
final String[] permissionsToRequest = unrequestedPermissions.toArray(new String[0]);
startShadowActivity(permissionsToRequest);
}
return Observable.concat(Observable.fromIterable(list));
}
@Test
public void test() {
PublishSubject publishSubject = PublishSubject.create();
Disposable disposable = Single.timer(1000 * 2, TimeUnit.MILLISECONDS)
.takeUntil(publishSubject.firstOrError())
.subscribe(
data -> System.out.println("ted"),
throwable -> System.err.println("ted" + throwable.toString())
);
disposable.dispose();
publishSubject.onNext(1); // emit end of lifecycle
}
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestConsumer<String> consumer = new TestConsumer<>();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(consumer.getCurrentValue(), is(equalTo(null)));
scheduler.triggerActions();
assertThat(consumer.getCurrentValue(), is("First Time"));
}
public Observable<ActivityResultInfo> startForResult(final Intent intent) {
final PublishSubject<ActivityResultInfo> subject = PublishSubject.create();
return subject.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
int requestCode = generateRequestCode();
mSubjects.put(requestCode, subject);
startActivityForResult(intent, requestCode);
}
});
}
@SuppressWarnings("unused")
@SuppressLint("UseSparseArrays")
public final <X extends AutoViewHolder, Y extends IRenderer<X>> PublishSubject<ItemInfo<Y, X>>
longClicks(@NonNull final Class<Y> clazz, @IdRes final int viewId) {
PublishSubject<ItemInfo<Y, X>> publishSubject = PublishSubject.create();
Map<Integer, PublishSubject> map = mChildViewsLongClickBinding.get(clazz);
if (map == null) {
map = new HashMap<>();
mChildViewsLongClickBinding.put(clazz, map);
}
map.put(viewId, publishSubject);
return publishSubject;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observer<Long> observer=new Observer<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("It's Done");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Long value) {
// TODO Auto-generated method stub
System.out.println(":"+value);
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
System.out.println("onSubscribe");
}
};
PublishSubject< Long> publishSubject=PublishSubject.create();
publishSubject.onNext(1L);
publishSubject.onNext(2l);
publishSubject.subscribe(observer);
publishSubject.onNext(10l);
publishSubject.onNext(20l);
publishSubject.onComplete();
}
@Test
public void testPersistStderrReadsDataFromObservable() {
final JobDAO dao = getInstance();
final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
final Subject<byte[]> stderrSubject = PublishSubject.create();
final AtomicBoolean stderrObsWasRead = new AtomicBoolean(false);
final Observable<byte[]> stderrObs = stderrSubject.map(data -> {
stderrObsWasRead.set(true);
return data;
});
dao.appendStderr(jobId, stderrObs);
assertThat(stderrObsWasRead.get());
}
@Test
public void testExecuteEvaluatesToJSONFunctionAsExpected() throws InterruptedException, IOException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${toJSON(inputs)}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
TestHelpers.assertJSONEqual(stringFromStdout, toJSON(STANDARD_REQUEST.getInputs()));
}
@Test public void shouldClearQueuedResults() throws Exception {
ObservableGroup group = observableManager.newGroup();
PublishSubject<String> sourceObservable = PublishSubject.create();
TestObserver<String> subscriber1 = new TestObserver<>();
sourceObservable.compose(group.transform(subscriber1)).subscribe(subscriber1);
group.dispose();
sourceObservable.onNext("Hello");
sourceObservable.onComplete();
observableManager.destroy(group);
assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
private FragmentResultPublisherImpl() {
publishSubject = PublishSubject.create();
}
/**
* Converts the EventPublisher into an Observable.
*
* @param eventPublisher the event publisher
* @param <T> the type of the event
* @return the Observable
*/
public static <T> Observable<T> toObservable(EventPublisher<T> eventPublisher) {
PublishSubject<T> publishSubject = PublishSubject.create();
Subject<T> serializedSubject = publishSubject.toSerialized();
eventPublisher.onEvent(serializedSubject::onNext);
return serializedSubject;
}