下面列出了怎么用io.reactivex.subjects.ReplaySubject的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
void replaySubject_conf_test() {
//ReplaySubject<Object> replaySubject = ReplaySubject.createWithSize(5);//只缓存订阅前最后5条数据
//ReplaySubject<Object> replaySubject = ReplaySubject.createWithTime(5,TimeUnit.SECONDS, Schedulers.computation()); //只缓存被订阅前5秒内的数据
ReplaySubject<Object> replaySubject =ReplaySubject.createWithTimeAndSize(5,
TimeUnit.SECONDS, Schedulers.computation(),3); // 请结合以上两者注释
for (Long i=1l;i<10l;i++){
replaySubject.onNext(i);
sleep(1,TimeUnit.SECONDS);
}
replaySubject.subscribe(x -> log("一郎神: " + x),
Throwable::printStackTrace,
() -> System.out.println("Emission completed"),
disposable -> System.out.println("onSubscribe")
);
replaySubject.onNext(10l);
replaySubject.onComplete();
}
private void doSomeWork() {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 1, 2, 3, 4 for second observer also as we have used replay
*/
source.subscribe(getSecondObserver());
}
@Test public void testUnsubscribeBeforeEmit() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
proxy.dispose();
observer.assertNotComplete();
observer.assertNoValues();
subject.onNext("Avanti!");
subject.onComplete();
// disposable observables may not be resused in RxJava2
observer = new TestObserver<>();
proxy.subscribe(observer);
observer.assertComplete();
observer.assertValue("Avanti!");
}
@Test public void shouldCacheResultsWhileUnsubscribedAndDeliverAfterResubscription() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
proxy.dispose();
observer.assertNoValues();
subject.onNext("Avanti!");
subject.onComplete();
// disposable observables may not be resused in RxJava2
observer = new TestObserver<>();
proxy.subscribe(observer);
observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
observer.assertValue("Avanti!");
}
@Test public void shouldRedeliverSameResultsToDifferentSubscriber() {
// Use case: When rotating an activity, ObservableManager will re-subscribe original request's
// Observable to a new Observer, which is a member of the new activity instance. In this
// case, we may want to redeliver any previous results (if the request is still being
// managed by ObservableManager).
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
subject.onNext("Avanti!");
subject.onComplete();
proxy.dispose();
TestObserver<String> newSubscriber = new TestObserver<>();
proxy.subscribe(newSubscriber);
newSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
newSubscriber.assertComplete();
newSubscriber.assertValue("Avanti!");
observer.assertComplete();
observer.assertValue("Avanti!");
}
@Test
void replaySubject_test() {
ReplaySubject<Object> replaySubject = ReplaySubject.create();
replaySubject.onNext(1l);
replaySubject.onNext(2l);
replaySubject.subscribe(x -> log("一郎神: " + x),
Throwable::printStackTrace,
() -> System.out.println("Emission completed"),
disposable -> System.out.println("onSubscribe")
);
replaySubject.onNext(10l);
replaySubject.onComplete();
}
@Test
public void testSubjectExample() {
/*
* Step 1: Create an observable that emits an integer
* from 1 to 5
* */
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io());
/*
* Step 2: Create a subject that observes
* this emission from the observable. In this scenario,
* the subject acts an an observer since it observes the
* changes to the Observable.
* */
ReplaySubject<Integer> subject = ReplaySubject.create();
observable.subscribe(subject);
/*
* Step 3: In this scenario, the subject acts an an Observable,
* since it emits each item from the original Observable.
* */
subject.subscribe(s -> System.out.println("subscriber one: " + s));
subject.subscribe(s -> System.out.println("subscriber two: " + s));
}
@Test
public void testSubjectMulticastExample() {
/*
* Step 1: Create an observable that emits an integer
* from 1 to 5. Each item is squared by itself before it is
* emitted.
* */
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.map(integer -> {
System.out.println(String.format("Squaring %d with itself", integer));
return integer * integer;
});
/*
* Step 2: Create a subject that observes
* this emission from the observable.
* */
ReplaySubject<Integer> subject = ReplaySubject.create();
observable.subscribe(subject);
/*
* Step 3: We are subscribing two subscribers to the Subject.
* */
subject.subscribe(s -> System.out.println("subscriber one: " + s));
subject.subscribe(s -> System.out.println("subscriber two: " + s));
}
@Test
public void testSubmitJobEventListenersEchoStdoutWhenExecutorEchoesStdout() throws InterruptedException {
final Subject<byte[]> stdoutSubject = ReplaySubject.create();
final byte[] expectedStdoutBytes = generateRandomBytes();
stdoutSubject.onNext(expectedStdoutBytes);
final JobExecutor jobExecutor = MockJobExecutor.thatUses(stdoutSubject, Observable.never());
final JobManager jobManager = createManagerWith(jobExecutor);
final Semaphore s = new Semaphore(1);
s.acquire();
final JobEventListeners listeners = JobEventListeners.createStdoutListener(new Observer<byte[]>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {}
@Override
public void onNext(@NonNull byte[] bytes) {
assertThat(bytes).isEqualTo(expectedStdoutBytes);
s.release();
}
@Override
public void onError(@NonNull Throwable throwable) {
fail("Error from observable");
s.release();
}
@Override
public void onComplete() {}
});
jobManager.submit(STANDARD_VALID_REQUEST, listeners);
if (!s.tryAcquire(1, SECONDS)) {
fail("Timed out before any bytes received");
}
}
@Test
public void testSubmitJobEventListenersEchoStderrWhenExecutorEchoesStderr() throws InterruptedException {
final Subject<byte[]> stderr = ReplaySubject.create();
final byte[] stderrBytes = generateRandomBytes();
stderr.onNext(stderrBytes);
final JobExecutor jobExecutor = MockJobExecutor.thatUses(Observable.never(), stderr);
final JobManager jobManager = createManagerWith(jobExecutor);
final Semaphore s = new Semaphore(1);
s.acquire();
final JobEventListeners listeners = JobEventListeners.createStderrListener(new Observer<byte[]>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {}
@Override
public void onNext(@NonNull byte[] bytes) {
assertThat(bytes).isEqualTo(stderrBytes);
s.release();
}
@Override
public void onError(@NonNull Throwable throwable) {
fail("Error from observable");
s.release();
}
@Override
public void onComplete() {}
});
jobManager.submit(STANDARD_VALID_REQUEST, listeners);
if (!s.tryAcquire(1, SECONDS)) {
fail("Timed out before any bytes received");
}
}
public AbstractDataSet(Integer max) {
this.status = Status.CREATED;
this.count = 0;
this.tester = new LoggerUtil();
this.max = max;
this.subject = ReplaySubject.create(max);
}
public AbstractDataSet() {
this.status = Status.CREATED;
this.count = 0;
this.tester = new LoggerUtil();
this.max = null;
this.subject = ReplaySubject.create();
}
public AbstractDataSet(IPaged policy) {
this.status = Status.CREATED;
this.count = 0;
this.tester = new LoggerUtil();
this.max = policy.hasMax()
? (policy.hasPerIteration() ? policy.perIteration() : 1) * policy.increment() * policy.max()
: null;
if (max != null) {
this.subject = ReplaySubject.create(max > 1000 ? 1000 : max);
}
}
/**
* 上传多个数据
*/
@SuppressWarnings("unchecked")
public Observable<UploadResponse<T, R>> upload(List<T> tList) {
checkUploadCallback();
//拷贝一份,防止在遍历列表时其它线程修改 tList
final List<T> copyList = new ArrayList<>(tList);
synchronized (monitor) {
needRemoved.removeAll(copyList);
errorTask.removeAll(copyList);
}
//不需要网络上传部分,已完成
List<T> needlessUpload = new ArrayList<>();
//未上传的数据
List<T> needUpload = new ArrayList<>();
synchronized (monitor) {
for (T t : copyList) {
if (completedTask.containsKey(t)) {
needlessUpload.add(t);
} else {
needUpload.add(t);
}
}
}
final ReplaySubject<UploadResponse<T, R>> recorder = ReplaySubject.create();
recorderList.add(recorder);
return Observable.fromArray(Pair.create(false, needlessUpload), new Pair<>(true, needUpload))
.flatMap(pair -> pair.first
? networkUpload(pair.second)
: noUpload(pair.second)).observeOn(AndroidSchedulers.mainThread())
.doOnNext(recorder::onNext)
.doOnComplete(recorder::onComplete);
}
@Test public void shouldKeepDeliveringEventsAfterResubscribed() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
subject.onNext("Avanti 1");
proxy.dispose();
observer = new TestObserver<>();
proxy.subscribe(observer);
subject.onNext("Avanti!");
observer.assertValues("Avanti 1", "Avanti!");
}
@Before
public void setup() throws Exception {
super.setup();
sessionPub = ReplaySubject.create();
sessionPub.onNext(rxProcessSession);
when(rxProcess.open()).thenAnswer(invocation -> {
when(rxProcessSession.waitFor()).thenReturn(Single.create(e -> waitForEmitter = e));
return sessionPub.firstOrError();
});
cmdStream = new MockOutputStream(new MockOutputStream.Listener() {
@Override
public void onNewLine(String line) {
if (line.equals("exit" + LineReader.getLineSeparator())) {
try {
cmdStream.close();
} catch (IOException e) {
Timber.e(e);
} finally {
waitForEmitter.onSuccess(0);
}
}
}
@Override
public void onClose() {
}
});
outputStream = new MockInputStream();
errorStream = new MockInputStream();
when(rxProcessSession.input()).thenReturn(cmdStream);
when(rxProcessSession.output()).thenReturn(outputStream);
when(rxProcessSession.error()).thenReturn(errorStream);
when(rxProcessSession.isAlive()).thenReturn(Single.create(e -> e.onSuccess(cmdStream.isOpen())));
when(rxProcessSession.destroy()).then(invocation -> Completable.create(e -> {
cmdStream.close();
waitForEmitter.onSuccess(1);
e.onComplete();
}));
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observer<Long> observer=new Observer<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("It's Done");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Long value) {
// TODO Auto-generated method stub
System.out.println(":"+value);
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
System.out.println("onSubscribe");
}
};
ReplaySubject<Long> replaySubject=ReplaySubject.create();
replaySubject.onNext(1l);
replaySubject.onNext(2l);
replaySubject.subscribe(observer);
replaySubject.onNext(10l);
replaySubject.onComplete();
}
@Override
protected Subject<NetworkInfo> createSubject() {
return ReplaySubject.create();
}
@Override
protected Subject<ViewIssueInfo> createSubject() {
return ReplaySubject.create();
}
@Override
protected Subject<PageLifecycleEventInfo> createSubject() {
return ReplaySubject.create();
}
@Override
protected Subject<BlockInfo> createSubject() {
return ReplaySubject.create();
}
@Override
protected Subject<ImageIssue> createSubject() {
return ReplaySubject.create();
}
@Override
protected Subject<List<CrashInfo>> createSubject() {
return ReplaySubject.create();
}
@Override
protected Subject<LeakQueue.LeakMemoryInfo> createSubject() {
return ReplaySubject.create();
}
@Override
protected Subject<LeakInfo> createSubject() {
return ReplaySubject.create();
}
public Data(){
this.status = Status.CREATED;
this.subject = ReplaySubject.create();
}
public static RxJava2SubjProxy replaySubjectProxy() {
return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.PASS);
}
public static RxJava2SubjProxy serializedReplaySubjectProxy() {
return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.PASS);
}
public static RxJava2SubjProxy safeReplaySubjectProxy() {
return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.WRAP);
}
public static RxJava2SubjProxy safeSerializedReplaySubjectProxy() {
return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.WRAP);
}