io.reactivex.subjects.PublishSubject#create ( )源码实例Demo

下面列出了io.reactivex.subjects.PublishSubject#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldAutoResubscribeAfterLockAndUnlock() {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
  group.lock();

  sourceObservable.onNext("Chespirito");
  sourceObservable.onComplete();

  group.unlock();

  testObserver.assertTerminated();
  testObserver.assertNoErrors();
  testObserver.assertValue("Chespirito");
  assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
 
源代码2 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteEvaluatesOutputDirAsExpected() throws InterruptedException {
    final JobExecutor jobExecutor = getInstance();
    final PersistedJob req =
            standardRequestWithCommand("echo", "${outputDir}");
    final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
    final Subject<byte[]> stdoutSubject = PublishSubject.create();
    stdoutSubject.subscribe(bytes ->
            bytesEchoedToStdout.getAndUpdate(existingBytes ->
                    Bytes.concat(existingBytes, bytes)));

    final Semaphore s = new Semaphore(1);
    s.acquire();
    stdoutSubject.doOnComplete(s::release).subscribe();

    final JobEventListeners listeners =
            createStdoutListener(stdoutSubject);

    jobExecutor.execute(req, listeners);

    s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);

    final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();

    assertThat(Files.exists(Paths.get(stringFromStdout)));
}
 
源代码3 项目: RxPermissions   文件: RxPermissionsTest.java
@Test
@TargetApi(Build.VERSION_CODES.M)
public void eachSubscription_trigger_granted() {
    TestObserver<Permission> sub = new TestObserver<>();
    String permission = Manifest.permission.READ_PHONE_STATE;
    when(mRxPermissions.isGranted(permission)).thenReturn(false);
    int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
    PublishSubject<Object> trigger = PublishSubject.create();

    trigger.compose(mRxPermissions.ensureEach(permission)).subscribe(sub);
    trigger.onNext(1);
    mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);

    sub.assertNoErrors();
    sub.assertNotTerminated();
    sub.assertValue(new Permission(permission, true));
}
 
源代码4 项目: RxPermissions   文件: RxPermissionsTest.java
@Test
@TargetApi(Build.VERSION_CODES.M)
public void subscription_trigger_granted() {
    TestObserver<Boolean> sub = new TestObserver<>();
    String permission = Manifest.permission.READ_PHONE_STATE;
    when(mRxPermissions.isGranted(permission)).thenReturn(false);
    int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
    PublishSubject<Object> trigger = PublishSubject.create();

    trigger.compose(mRxPermissions.ensure(permission)).subscribe(sub);
    trigger.onNext(1);
    mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);

    sub.assertNoErrors();
    sub.assertNotTerminated();
    sub.assertValue(true);
}
 
源代码5 项目: mosby   文件: DisposableIntentObserverTest.java
@Test public void error() {
  PublishSubject subject = PublishSubject.create();
  TestObserver sub = new TestObserver<>();
  subject.subscribeWith(sub);

  Exception originalException = new RuntimeException("I am the original Exception");
  Exception expectedException =
      new IllegalStateException("View intents must not throw errors", originalException);
  try {

    Observable.error(originalException).subscribe(new DisposableIntentObserver(subject));

    Assert.fail("Exception expected");
  } catch (Exception e) {
    Throwable cause = e.getCause();
    Assert.assertEquals(expectedException.getMessage(), cause.getMessage());
    Assert.assertEquals(originalException, cause.getCause());
  }

  sub.assertNotComplete().assertNoValues();
}
 
源代码6 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldNotRemoveSubscribersForOtherIds() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  ObservableGroup group2 = observableManager.newGroup();
  PublishSubject<String> subject1 = PublishSubject.create();
  TestAutoResubscribingObserver testSubscriber1 = new TestAutoResubscribingObserver("foo");
  PublishSubject<String> subject2 = PublishSubject.create();
  TestAutoResubscribingObserver testSubscriber2 = new TestAutoResubscribingObserver("bar");

  subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
  subject2.compose(group2.transform(testSubscriber2)).subscribe(testSubscriber2);
  group.dispose();

  subject1.onNext("Florinda Mesa");
  subject1.onComplete();
  subject2.onNext("Carlos Villagran");
  subject2.onComplete();

  testSubscriber1.assertionTarget.assertNotComplete();
  testSubscriber2.assertionTarget.assertNoErrors();
  testSubscriber2.assertionTarget.assertValue("Carlos Villagran");
}
 
源代码7 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void testDisposingObserver() {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);

  testObserver.dispose();

  sourceObservable.onNext("Chespirito");
  testObserver.assertNoValues();

  assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
 
private void demo4() {
    PublishSubject<Integer> observable = PublishSubject.create();

    observable.toFlowable(BackpressureStrategy.MISSING)
            .buffer(10)
            .observeOn(Schedulers.computation())
            .subscribe(v -> log("s", v.toString()), this::log);

    for (int i = 0; i < 1000000; i++) {
        observable.onNext(i);
    }
}
 
源代码9 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void testAddThrowsAfterDestroyed() {
  ObservableGroup group = observableManager.newGroup();
  Observable<String> source = PublishSubject.create();
  TestObserver<String> observer = new TestObserver<>();
  group.destroy();
  source.compose(group.transform(observer)).subscribe(observer);
  observer.assertError(IllegalStateException.class);
}
 
源代码10 项目: RxPermission   文件: RealRxPermission.java
@NonNull @CheckReturnValue @TargetApi(M) Observable<Permission> requestOnM(@NonNull final String... permissions) {
  final List<Observable<Permission>> list = new ArrayList<>(permissions.length);
  final List<String> unrequestedPermissions = new ArrayList<>();

  // In case of multiple permissions, we create an observable for each of them.
  // At the end, the observables are combined to have a unique response.

  for (final String permission : permissions) {
    if (isGranted(permission)) {
      list.add(Observable.just(Permission.granted(permission)));
    } else if (isRevokedByPolicy(permission)) {
      list.add(Observable.just(Permission.revokedByPolicy(permission)));
    } else {
      PublishSubject<Permission> subject = currentPermissionRequests.get(permission);

      // Create a new subject if not exists
      if (subject == null) {
        unrequestedPermissions.add(permission);
        subject = PublishSubject.create();
        currentPermissionRequests.put(permission, subject);
      }

      list.add(subject);
    }
  }

  if (!unrequestedPermissions.isEmpty()) {
    final String[] permissionsToRequest = unrequestedPermissions.toArray(new String[0]);
    startShadowActivity(permissionsToRequest);
  }

  return Observable.concat(Observable.fromIterable(list));
}
 
源代码11 项目: akarnokd-misc   文件: TakeUntil.java
@Test
public void test() {
    PublishSubject publishSubject = PublishSubject.create();

    Disposable disposable = Single.timer(1000 * 2, TimeUnit.MILLISECONDS)
            .takeUntil(publishSubject.firstOrError())
            .subscribe(
                    data -> System.out.println("ted"),
                    throwable -> System.err.println("ted" + throwable.toString())
            );
    disposable.dispose();

    publishSubject.onNext(1); // emit end of lifecycle
}
 
源代码12 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
    throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestConsumer<String> consumer = new TestConsumer<>();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(consumer.getCurrentValue(), is(equalTo(null)));
  scheduler.triggerActions();
  assertThat(consumer.getCurrentValue(), is("First Time"));
}
 
源代码13 项目: AvoidOnResult   文件: AvoidOnResultFragment.java
public Observable<ActivityResultInfo> startForResult(final Intent intent) {
    final PublishSubject<ActivityResultInfo> subject = PublishSubject.create();
    return subject.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            int requestCode = generateRequestCode();
            mSubjects.put(requestCode, subject);
            startActivityForResult(intent, requestCode);
        }
    });
}
 
源代码14 项目: AutoAdapter   文件: BaseAutoAdapter.java
@SuppressWarnings("unused")
@SuppressLint("UseSparseArrays")
public final <X extends AutoViewHolder, Y extends IRenderer<X>> PublishSubject<ItemInfo<Y, X>>
longClicks(@NonNull final Class<Y> clazz, @IdRes final int viewId) {
    PublishSubject<ItemInfo<Y, X>> publishSubject = PublishSubject.create();
    Map<Integer, PublishSubject> map = mChildViewsLongClickBinding.get(clazz);
    if (map == null) {
        map = new HashMap<>();
        mChildViewsLongClickBinding.put(clazz, map);
    }
    map.put(viewId, publishSubject);
    return publishSubject;
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observer<Long> observer=new Observer<Long>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("It's Done");
			
		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();
			
		}

		@Override
		public void onNext(Long value) {
			// TODO Auto-generated method stub
			System.out.println(":"+value);
		}

		@Override
		public void onSubscribe(Disposable disposable) {
			// TODO Auto-generated method stub
			System.out.println("onSubscribe");
			
		}
	};

	PublishSubject< Long> publishSubject=PublishSubject.create();
	publishSubject.onNext(1L);
	publishSubject.onNext(2l);
	publishSubject.subscribe(observer);
	publishSubject.onNext(10l);
	publishSubject.onNext(20l);
	publishSubject.onComplete();
}
 
源代码16 项目: jobson   文件: JobsDAOTest.java
@Test
public void testPersistStderrReadsDataFromObservable() {
    final JobDAO dao = getInstance();
    final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
    final Subject<byte[]> stderrSubject = PublishSubject.create();
    final AtomicBoolean stderrObsWasRead = new AtomicBoolean(false);
    final Observable<byte[]> stderrObs = stderrSubject.map(data -> {
        stderrObsWasRead.set(true);
        return data;
    });

    dao.appendStderr(jobId, stderrObs);

    assertThat(stderrObsWasRead.get());
}
 
源代码17 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteEvaluatesToJSONFunctionAsExpected() throws InterruptedException, IOException {
    final JobExecutor jobExecutor = getInstance();
    final PersistedJob req =
            standardRequestWithCommand("echo", "${toJSON(inputs)}");
    final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
    final Subject<byte[]> stdoutSubject = PublishSubject.create();

    stdoutSubject.subscribe(bytes ->
            bytesEchoedToStdout.getAndUpdate(existingBytes ->
                    Bytes.concat(existingBytes, bytes)));

    final Semaphore s = new Semaphore(1);
    s.acquire();
    stdoutSubject.doOnComplete(s::release).subscribe();

    final JobEventListeners listeners =
            createStdoutListener(stdoutSubject);

    jobExecutor.execute(req, listeners);

    s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);

    final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();

    TestHelpers.assertJSONEqual(stringFromStdout, toJSON(STANDARD_REQUEST.getInputs()));
}
 
源代码18 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldClearQueuedResults() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> sourceObservable = PublishSubject.create();
  TestObserver<String> subscriber1 = new TestObserver<>();

  sourceObservable.compose(group.transform(subscriber1)).subscribe(subscriber1);
  group.dispose();
  sourceObservable.onNext("Hello");
  sourceObservable.onComplete();
  observableManager.destroy(group);

  assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
 
源代码19 项目: flowr   文件: FragmentResultPublisherImpl.java
private FragmentResultPublisherImpl() {
    publishSubject = PublishSubject.create();
}
 
源代码20 项目: resilience4j   文件: RxJava2Adapter.java
/**
 * Converts the EventPublisher into an Observable.
 *
 * @param eventPublisher the event publisher
 * @param <T>            the type of the event
 * @return the Observable
 */
public static <T> Observable<T> toObservable(EventPublisher<T> eventPublisher) {
    PublishSubject<T> publishSubject = PublishSubject.create();
    Subject<T> serializedSubject = publishSubject.toSerialized();
    eventPublisher.onEvent(serializedSubject::onNext);
    return serializedSubject;
}