类io.reactivex.subjects.ReplaySubject源码实例Demo

下面列出了怎么用io.reactivex.subjects.ReplaySubject的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
void replaySubject_conf_test() {
    //ReplaySubject<Object> replaySubject = ReplaySubject.createWithSize(5);//只缓存订阅前最后5条数据
    //ReplaySubject<Object> replaySubject = ReplaySubject.createWithTime(5,TimeUnit.SECONDS, Schedulers.computation()); //只缓存被订阅前5秒内的数据
    ReplaySubject<Object> replaySubject =ReplaySubject.createWithTimeAndSize(5,
            TimeUnit.SECONDS, Schedulers.computation(),3); // 请结合以上两者注释
    for (Long i=1l;i<10l;i++){
        replaySubject.onNext(i);
        sleep(1,TimeUnit.SECONDS);
    }

    replaySubject.subscribe(x -> log("一郎神: " + x),
            Throwable::printStackTrace,
            () -> System.out.println("Emission completed"),
            disposable -> System.out.println("onSubscribe")
    );
    replaySubject.onNext(10l);
    replaySubject.onComplete();

}
 
private void doSomeWork() {

        ReplaySubject<Integer> source = ReplaySubject.create();

        source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 1, 2, 3, 4 for second observer also as we have used replay
         */
        source.subscribe(getSecondObserver());

    }
 
源代码3 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void testUnsubscribeBeforeEmit() {
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  proxy.dispose();

  observer.assertNotComplete();
  observer.assertNoValues();

  subject.onNext("Avanti!");
  subject.onComplete();

  // disposable observables may not be resused in RxJava2
  observer = new TestObserver<>();
  proxy.subscribe(observer);
  observer.assertComplete();
  observer.assertValue("Avanti!");
}
 
源代码4 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void shouldCacheResultsWhileUnsubscribedAndDeliverAfterResubscription() {
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  proxy.dispose();

  observer.assertNoValues();

  subject.onNext("Avanti!");
  subject.onComplete();

  // disposable observables may not be resused in RxJava2
  observer = new TestObserver<>();
  proxy.subscribe(observer);

  observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
  observer.assertValue("Avanti!");
}
 
源代码5 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void shouldRedeliverSameResultsToDifferentSubscriber() {
  // Use case: When rotating an activity, ObservableManager will re-subscribe original request's
  // Observable to a new Observer, which is a member of the new activity instance. In this
  // case, we may want to redeliver any previous results (if the request is still being
  // managed by ObservableManager).
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);

  subject.onNext("Avanti!");
  subject.onComplete();

  proxy.dispose();

  TestObserver<String> newSubscriber = new TestObserver<>();
  proxy.subscribe(newSubscriber);

  newSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
  newSubscriber.assertComplete();
  newSubscriber.assertValue("Avanti!");

  observer.assertComplete();
  observer.assertValue("Avanti!");
}
 
@Test
void replaySubject_test() {
    ReplaySubject<Object> replaySubject = ReplaySubject.create();
    replaySubject.onNext(1l);
    replaySubject.onNext(2l);
    replaySubject.subscribe(x -> log("一郎神: " + x),
            Throwable::printStackTrace,
            () -> System.out.println("Emission completed"),
            disposable -> System.out.println("onSubscribe")
    );
    replaySubject.onNext(10l);
    replaySubject.onComplete();

}
 
源代码7 项目: RxAndroid-Sample   文件: ExampleUnitTest.java
@Test
public void testSubjectExample() {

    /*
     * Step 1: Create an observable that emits an integer
     * from 1 to 5
     *  */
    Observable<Integer> observable = Observable.range(1, 5)
            .subscribeOn(Schedulers.io());

    /*
     * Step 2: Create a subject that observes
     * this emission from the observable. In this scenario,
     * the subject acts an an observer since it observes the
     * changes to the Observable.
     *  */
    ReplaySubject<Integer> subject = ReplaySubject.create();
    observable.subscribe(subject);


    /*
     * Step 3: In this scenario, the subject acts an an Observable,
     * since it emits each item from the original Observable.
     *  */
    subject.subscribe(s -> System.out.println("subscriber one: " + s));
    subject.subscribe(s -> System.out.println("subscriber two: " + s));
}
 
源代码8 项目: RxAndroid-Sample   文件: ExampleUnitTest.java
@Test
public void testSubjectMulticastExample() {

    /*
     * Step 1: Create an observable that emits an integer
     * from 1 to 5. Each item is squared by itself before it is
     * emitted.
     *  */
    Observable<Integer> observable = Observable.range(1, 5)
            .subscribeOn(Schedulers.io())
            .map(integer -> {
                System.out.println(String.format("Squaring %d with itself", integer));
                return integer * integer;
            });

    /*
     * Step 2: Create a subject that observes
     * this emission from the observable.
     *  */
    ReplaySubject<Integer> subject = ReplaySubject.create();
    observable.subscribe(subject);


    /*
     * Step 3: We are subscribing two subscribers to the Subject.
     *  */
    subject.subscribe(s -> System.out.println("subscriber one: " + s));
    subject.subscribe(s -> System.out.println("subscriber two: " + s));
}
 
源代码9 项目: jobson   文件: JobManagerTest.java
@Test
public void testSubmitJobEventListenersEchoStdoutWhenExecutorEchoesStdout() throws InterruptedException {
    final Subject<byte[]> stdoutSubject = ReplaySubject.create();
    final byte[] expectedStdoutBytes = generateRandomBytes();
    stdoutSubject.onNext(expectedStdoutBytes);

    final JobExecutor jobExecutor = MockJobExecutor.thatUses(stdoutSubject, Observable.never());
    final JobManager jobManager = createManagerWith(jobExecutor);

    final Semaphore s = new Semaphore(1);
    s.acquire();

    final JobEventListeners listeners = JobEventListeners.createStdoutListener(new Observer<byte[]>() {
        @Override
        public void onSubscribe(@NonNull Disposable disposable) {}

        @Override
        public void onNext(@NonNull byte[] bytes) {
            assertThat(bytes).isEqualTo(expectedStdoutBytes);
            s.release();
        }

        @Override
        public void onError(@NonNull Throwable throwable) {
            fail("Error from observable");
            s.release();
       }

        @Override
        public void onComplete() {}
    });

    jobManager.submit(STANDARD_VALID_REQUEST, listeners);

    if (!s.tryAcquire(1, SECONDS)) {
        fail("Timed out before any bytes received");
    }
}
 
源代码10 项目: jobson   文件: JobManagerTest.java
@Test
public void testSubmitJobEventListenersEchoStderrWhenExecutorEchoesStderr() throws InterruptedException {
    final Subject<byte[]> stderr = ReplaySubject.create();
    final byte[] stderrBytes = generateRandomBytes();
    stderr.onNext(stderrBytes);

    final JobExecutor jobExecutor = MockJobExecutor.thatUses(Observable.never(), stderr);
    final JobManager jobManager = createManagerWith(jobExecutor);

    final Semaphore s = new Semaphore(1);
    s.acquire();

    final JobEventListeners listeners = JobEventListeners.createStderrListener(new Observer<byte[]>() {
        @Override
        public void onSubscribe(@NonNull Disposable disposable) {}

        @Override
        public void onNext(@NonNull byte[] bytes) {
            assertThat(bytes).isEqualTo(stderrBytes);
            s.release();
        }

        @Override
        public void onError(@NonNull Throwable throwable) {
            fail("Error from observable");
            s.release();
        }

        @Override
        public void onComplete() {}
    });

    jobManager.submit(STANDARD_VALID_REQUEST, listeners);

    if (!s.tryAcquire(1, SECONDS)) {
        fail("Timed out before any bytes received");
    }
}
 
源代码11 项目: scava   文件: AbstractDataSet.java
public AbstractDataSet(Integer max) {
	this.status = Status.CREATED;
	this.count = 0;
	this.tester = new LoggerUtil();
	this.max = max;
	this.subject = ReplaySubject.create(max);
}
 
源代码12 项目: scava   文件: AbstractDataSet.java
public AbstractDataSet() {
	this.status = Status.CREATED;
	this.count = 0;
	this.tester = new LoggerUtil();
	this.max = null;
	this.subject = ReplaySubject.create();
}
 
源代码13 项目: scava   文件: AbstractDataSet.java
public AbstractDataSet(IPaged policy) {
	this.status = Status.CREATED;
	this.count = 0;
	this.tester = new LoggerUtil();
	this.max = policy.hasMax()
			? (policy.hasPerIteration() ? policy.perIteration() : 1) * policy.increment() * policy.max()
			: null;
	if (max != null) {
		this.subject = ReplaySubject.create(max > 1000 ? 1000 : max);
	}
}
 
源代码14 项目: smart-farmer-android   文件: UploadManager.java
/**
 * 上传多个数据
 */
@SuppressWarnings("unchecked")
public Observable<UploadResponse<T, R>> upload(List<T> tList) {
    checkUploadCallback();

    //拷贝一份,防止在遍历列表时其它线程修改 tList
    final List<T> copyList = new ArrayList<>(tList);

    synchronized (monitor) {
        needRemoved.removeAll(copyList);
        errorTask.removeAll(copyList);
    }

    //不需要网络上传部分,已完成
    List<T> needlessUpload = new ArrayList<>();
    //未上传的数据
    List<T> needUpload = new ArrayList<>();

    synchronized (monitor) {
        for (T t : copyList) {
            if (completedTask.containsKey(t)) {
                needlessUpload.add(t);
            } else {
                needUpload.add(t);
            }
        }
    }

    final ReplaySubject<UploadResponse<T, R>> recorder = ReplaySubject.create();
    recorderList.add(recorder);

    return Observable.fromArray(Pair.create(false, needlessUpload), new Pair<>(true, needUpload))
            .flatMap(pair -> pair.first
                    ? networkUpload(pair.second)
                    : noUpload(pair.second)).observeOn(AndroidSchedulers.mainThread())
            .doOnNext(recorder::onNext)
            .doOnComplete(recorder::onComplete);
}
 
源代码15 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void shouldKeepDeliveringEventsAfterResubscribed() {
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  subject.onNext("Avanti 1");
  proxy.dispose();
  observer = new TestObserver<>();
  proxy.subscribe(observer);

  subject.onNext("Avanti!");

  observer.assertValues("Avanti 1", "Avanti!");
}
 
源代码16 项目: RxShell   文件: RxShellTest.java
@Before
public void setup() throws Exception {
    super.setup();
    sessionPub = ReplaySubject.create();
    sessionPub.onNext(rxProcessSession);
    when(rxProcess.open()).thenAnswer(invocation -> {
        when(rxProcessSession.waitFor()).thenReturn(Single.create(e -> waitForEmitter = e));
        return sessionPub.firstOrError();
    });

    cmdStream = new MockOutputStream(new MockOutputStream.Listener() {
        @Override
        public void onNewLine(String line) {
            if (line.equals("exit" + LineReader.getLineSeparator())) {
                try {
                    cmdStream.close();
                } catch (IOException e) {
                    Timber.e(e);
                } finally {
                    waitForEmitter.onSuccess(0);
                }
            }
        }

        @Override
        public void onClose() {

        }
    });
    outputStream = new MockInputStream();
    errorStream = new MockInputStream();

    when(rxProcessSession.input()).thenReturn(cmdStream);
    when(rxProcessSession.output()).thenReturn(outputStream);
    when(rxProcessSession.error()).thenReturn(errorStream);
    when(rxProcessSession.isAlive()).thenReturn(Single.create(e -> e.onSuccess(cmdStream.isOpen())));

    when(rxProcessSession.destroy()).then(invocation -> Completable.create(e -> {
        cmdStream.close();
        waitForEmitter.onSuccess(1);
        e.onComplete();
    }));
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observer<Long> observer=new Observer<Long>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("It's Done");
			
		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();
			
		}

		@Override
		public void onNext(Long value) {
			// TODO Auto-generated method stub
			System.out.println(":"+value);
		}

		@Override
		public void onSubscribe(Disposable disposable) {
			// TODO Auto-generated method stub
			System.out.println("onSubscribe");
			
		}
	};
	
	ReplaySubject<Long> replaySubject=ReplaySubject.create();
	replaySubject.onNext(1l);
	replaySubject.onNext(2l);
	replaySubject.subscribe(observer);
	replaySubject.onNext(10l);
	replaySubject.onComplete();
	

}
 
源代码18 项目: AndroidGodEye   文件: Network.java
@Override
protected Subject<NetworkInfo> createSubject() {
    return ReplaySubject.create();
}
 
源代码19 项目: AndroidGodEye   文件: ViewCanary.java
@Override
protected Subject<ViewIssueInfo> createSubject() {
    return ReplaySubject.create();
}
 
源代码20 项目: AndroidGodEye   文件: Pageload.java
@Override
protected Subject<PageLifecycleEventInfo> createSubject() {
    return ReplaySubject.create();
}
 
源代码21 项目: AndroidGodEye   文件: Sm.java
@Override
protected Subject<BlockInfo> createSubject() {
    return ReplaySubject.create();
}
 
源代码22 项目: AndroidGodEye   文件: ImageCanary.java
@Override
protected Subject<ImageIssue> createSubject() {
    return ReplaySubject.create();
}
 
源代码23 项目: AndroidGodEye   文件: Crash.java
@Override
protected Subject<List<CrashInfo>> createSubject() {
    return ReplaySubject.create();
}
 
源代码24 项目: AndroidGodEye   文件: LeakDetector.java
@Override
protected Subject<LeakQueue.LeakMemoryInfo> createSubject() {
    return ReplaySubject.create();
}
 
源代码25 项目: AndroidGodEye   文件: Leak.java
@Override
protected Subject<LeakInfo> createSubject() {
    return ReplaySubject.create();
}
 
源代码26 项目: scava   文件: Data.java
public Data(){
	this.status = Status.CREATED;
	this.subject = ReplaySubject.create();
}
 
源代码27 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2SubjProxy replaySubjectProxy() {
    return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.PASS);
}
 
源代码28 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2SubjProxy serializedReplaySubjectProxy() {
    return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.PASS);
}
 
源代码29 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2SubjProxy safeReplaySubjectProxy() {
    return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.WRAP);
}
 
源代码30 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2SubjProxy safeSerializedReplaySubjectProxy() {
    return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.WRAP);
}
 
 类所在包
 同包方法