下面列出了怎么用io.reactivex.subjects.Subject的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testPersistStdoutReturnsADisposableThatStopsFurtherReads() {
final JobDAO dao = getInstance();
final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
final Subject<byte[]> stdoutSubject = PublishSubject.create();
final AtomicBoolean stdoutObsWasRead = new AtomicBoolean(false);
final Observable<byte[]> stdoutObs = stdoutSubject.map(data -> {
stdoutObsWasRead.set(true);
return data;
});
final Disposable disposable = dao.appendStdout(jobId, stdoutObs);
disposable.dispose();
stdoutSubject.onNext(TestHelpers.generateRandomBytes());
assertThat(stdoutObsWasRead.get());
}
@Test
public void testExecuteStderrListenerIsCompletedOnceApplicationExecutionEnds() throws Throwable {
final JobExecutor jobExecutor = getInstance();
final AtomicBoolean completedCalled = new AtomicBoolean(false);
final Subject<byte[]> stderrSubject = PublishSubject.create();
stderrSubject.doOnComplete(() -> completedCalled.set(true)).subscribe();
final JobEventListeners listeners = createStderrListener(stderrSubject);
final CancelablePromise<JobExecutionResult> ret =
jobExecutor.execute(STANDARD_REQUEST, listeners);
promiseAssert(ret, result -> {
try {
// The stderr thread can race with the exit thread
Thread.sleep(50);
assertThat(completedCalled.get()).isTrue();
} catch (InterruptedException ignored) {}
});
}
@Test
public void testExecuteEvaluatesJobInputsAsExpected() throws InterruptedException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${inputs.foo}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
assertThat(stringFromStdout).isEqualTo("a"); // from spec
}
@Test
public void testExecuteEvaluatesJoinAsExpected() throws InterruptedException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${join(',', inputs.someList)}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
assertThat(stringFromStdout).isEqualTo("a,b,c,d"); // From the input fixture
}
@Test
public void testExecuteEvaluatesToStringAsExpected() throws InterruptedException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${toString(inputs.someString)}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
assertThat(stringFromStdout).isEqualTo("hello, world!"); // from input fixture
}
@Test
public void testExecuteEvaluatesOutputDirAsExpected() throws InterruptedException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${outputDir}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
assertThat(Files.exists(Paths.get(stringFromStdout)));
}
@Test
public void testGetStderrUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException {
final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>();
final Subject<byte[]> stderrSubject = PublishSubject.create();
final JobExecutor executor =
MockJobExecutor.thatUses(executorPromise, Observable.just(TestHelpers.generateRandomBytes()), stderrSubject);
final JobManager jobManager = createManagerWith(executor);
final Pair<JobId, CancelablePromise<FinalizedJob>> ret =
jobManager.submit(STANDARD_VALID_REQUEST);
final Observable<byte[]> stderrObservable =
jobManager.stderrUpdates(ret.getLeft()).get();
final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>();
stderrObservable.subscribe(bytesFromObservable::set);
final byte[] bytesExpected = TestHelpers.generateRandomBytes();
stderrSubject.onNext(bytesExpected);
executorPromise.complete(new JobExecutionResult(FINISHED));
ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS);
assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected);
}
private static void demo2() {
Subject<Long> subject = PublishSubject.create();
Observable.interval(2, TimeUnit.SECONDS)
.take(3)
.doOnComplete(() -> log("Origin-One-doOnComplete"))
.subscribe(subject);
Observable.interval(1, TimeUnit.SECONDS)
.take(2)
.doOnComplete(() -> log("Origin-Two-doOnComplete"))
.subscribe(subject);
subject
.doOnComplete(() -> log("First-doOnComplete"))
.subscribe(v -> log(v));
}
private static void demo1() throws InterruptedException {
Subject<Long> subject = PublishSubject.create();
Observable.interval(2, TimeUnit.SECONDS)
.take(5)
.doOnSubscribe((d) -> log("Original-doOnSubscribe"))
.doOnComplete(() -> log("Original-doOnComplete"))
.subscribe(subject);
subject
.doOnSubscribe((d) -> log("First-doOnSubscribe"))
.doOnComplete(() -> log("First-doOnComplete"))
.subscribe(v -> log("First: " + v));
Thread.sleep(4100);
subject
.doOnSubscribe((d) -> log("Second-doOnSubscribe"))
.doOnComplete(() -> log("Second-doOnComplete"))
.subscribe(v -> log("Second: " + v));
}
private Observable<JsonElement> revokeToken(AuthToken token, String clientSecret) {
// this complexity exists because the access token must be revoked AFTER the refresh token
// why? because the access token is needed for both revocations!
Subject<JsonElement> responses = PublishSubject.create();
RevokeReqBody refreshReqBody = RevokeReqBody.fromRefreshToken(
token.getRefreshToken(), clientSecret);
revokeSingleToken(token.getAuthHeader(), refreshReqBody, responses)
.doOnComplete(() -> {
RevokeReqBody accessReqBody = RevokeReqBody.fromAccessToken(
token.getAccessToken(), clientSecret);
revokeSingleToken(token.getAuthHeader(), accessReqBody, responses)
.subscribe();
})
.subscribe();
return responses;
}
@Override
public void handler(Packet packet, ChannelContext ctx) throws Exception {
if (packet instanceof WsPacket) {
WsPacket wsPacket = (WsPacket) packet;
if (!wsPacket.isWsEof()) {
return;
}
}
Subject<Packet> packetPublisher =
(Subject<Packet>) ctx.getAttribute(WebSocketImpl.packetPublisherKey);
packetPublisher.onNext(packet);
}
private void advanceJobQueue() {
final QueuedJob queuedJob = jobQueue.poll();
if (queuedJob == null) return;
final Subject<byte[]> stdout = PublishSubject.create();
final Subject<byte[]> stderr = PublishSubject.create();
jobDAO.appendStdout(queuedJob.getId(), stdout);
jobDAO.appendStderr(queuedJob.getId(), stderr);
stdout.subscribe(queuedJob.getQueuedListeners().getOnStdoutListener());
stderr.subscribe(queuedJob.getQueuedListeners().getOnStderrListener());
try {
final CancelablePromise<JobExecutionResult> executionPromise =
jobExecutor.execute(queuedJob, JobEventListeners.create(stdout, stderr));
final ExecutingJob executingJob =
ExecutingJob.fromQueuedJob(queuedJob, now(), stdout, stderr);
executingJobs.put(executingJob.getId(), executingJob);
updateJobStatus(queuedJob.getId(), RUNNING, "Submitted to executor");
executionPromise.thenAccept(res -> {
onExecutionFinished(executingJob, res);
});
executingJob.getCompletionPromise().onCancel(() -> {
executionPromise.cancel(true);
});
} catch (Throwable ex) {
log.error("Error starting job execution: " + ex.toString());
updateJobStatus(queuedJob.getId(), FATAL_ERROR, "Error executing job: " + ex.toString());
}
}
@Test
public void testPersistStdoutReadsDataFromObservable() {
final JobDAO dao = getInstance();
final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
final Subject<byte[]> stdoutSubject = PublishSubject.create();
final AtomicBoolean stdoutObsWasRead = new AtomicBoolean(false);
final Observable<byte[]> stdoutObs = stdoutSubject.map(data -> {
stdoutObsWasRead.set(true);
return data;
});
dao.appendStdout(jobId, stdoutObs);
assertThat(stdoutObsWasRead.get());
}
@Test
public void testPersistStderrReadsDataFromObservable() {
final JobDAO dao = getInstance();
final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
final Subject<byte[]> stderrSubject = PublishSubject.create();
final AtomicBoolean stderrObsWasRead = new AtomicBoolean(false);
final Observable<byte[]> stderrObs = stderrSubject.map(data -> {
stderrObsWasRead.set(true);
return data;
});
dao.appendStderr(jobId, stderrObs);
assertThat(stderrObsWasRead.get());
}
public void unregister(@NonNull Object tag, @NonNull Observable observable) {
List<Subject> subjects = mSubjectsMapper.get(tag);
if (subjects != null) {
subjects.remove(observable);
if (subjects.isEmpty()) {
mSubjectsMapper.remove(tag);
}
if (DEBUG) {
Timber.d("[unregister] mSubjectsMapper: " + mSubjectsMapper);
}
}
}
@Test
public void testExecuteEvaluatesToFileAsExpected() throws InterruptedException, IOException {
final JobExecutor jobExecutor = getInstance();
final PersistedJob req =
standardRequestWithCommand("echo", "${toFile(toJSON(inputs))}");
final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
final Subject<byte[]> stdoutSubject = PublishSubject.create();
stdoutSubject.subscribe(bytes ->
bytesEchoedToStdout.getAndUpdate(existingBytes ->
Bytes.concat(existingBytes, bytes)));
final Semaphore s = new Semaphore(1);
s.acquire();
stdoutSubject.doOnComplete(s::release).subscribe();
final JobEventListeners listeners =
createStdoutListener(stdoutSubject);
jobExecutor.execute(req, listeners);
s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);
final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
final Path p = Paths.get(stringFromStdout);
assertThat(p.toFile().exists());
final String loadedJson = new String(Files.readAllBytes(p));
TestHelpers.assertJSONEqual(loadedJson, toJSON(STANDARD_REQUEST.getInputs()));
}
@Test
public void testSubmitJobEventListenersEchoStdoutWhenExecutorEchoesStdout() throws InterruptedException {
final Subject<byte[]> stdoutSubject = ReplaySubject.create();
final byte[] expectedStdoutBytes = generateRandomBytes();
stdoutSubject.onNext(expectedStdoutBytes);
final JobExecutor jobExecutor = MockJobExecutor.thatUses(stdoutSubject, Observable.never());
final JobManager jobManager = createManagerWith(jobExecutor);
final Semaphore s = new Semaphore(1);
s.acquire();
final JobEventListeners listeners = JobEventListeners.createStdoutListener(new Observer<byte[]>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {}
@Override
public void onNext(@NonNull byte[] bytes) {
assertThat(bytes).isEqualTo(expectedStdoutBytes);
s.release();
}
@Override
public void onError(@NonNull Throwable throwable) {
fail("Error from observable");
s.release();
}
@Override
public void onComplete() {}
});
jobManager.submit(STANDARD_VALID_REQUEST, listeners);
if (!s.tryAcquire(1, SECONDS)) {
fail("Timed out before any bytes received");
}
}
@Test
public void testSubmitJobEventListenersEchoStderrWhenExecutorEchoesStderr() throws InterruptedException {
final Subject<byte[]> stderr = ReplaySubject.create();
final byte[] stderrBytes = generateRandomBytes();
stderr.onNext(stderrBytes);
final JobExecutor jobExecutor = MockJobExecutor.thatUses(Observable.never(), stderr);
final JobManager jobManager = createManagerWith(jobExecutor);
final Semaphore s = new Semaphore(1);
s.acquire();
final JobEventListeners listeners = JobEventListeners.createStderrListener(new Observer<byte[]>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {}
@Override
public void onNext(@NonNull byte[] bytes) {
assertThat(bytes).isEqualTo(stderrBytes);
s.release();
}
@Override
public void onError(@NonNull Throwable throwable) {
fail("Error from observable");
s.release();
}
@Override
public void onComplete() {}
});
jobManager.submit(STANDARD_VALID_REQUEST, listeners);
if (!s.tryAcquire(1, SECONDS)) {
fail("Timed out before any bytes received");
}
}
@NonNull
public <T> Observable<T> register(@NonNull Object tag) {
List<Subject> subjectList = subjectMapper.get(tag);
if (null == subjectList) {
subjectList = new ArrayList<>();
subjectMapper.put(tag, subjectList);
}
Subject<T> subject = PublishSubject.create();
subjectList.add(subject);
//System.out.println("注册到rxbus");
return subject;
}
public void unregister(@NonNull Object tag, @NonNull Observable observable) {
List<Subject> subjects = subjectMapper.get(tag);
if (null != subjects) {
subjects.remove(observable);
if (subjects.isEmpty()) {
subjectMapper.remove(tag);
//System.out.println("从rxbus取消注册");
}
}
}
public void post(@NonNull Object tag, @NonNull Object content) {
List<Subject> subjects = subjectMapper.get(tag);
if (!subjects.isEmpty()) {
for (Subject subject : subjects) {
subject.onNext(content);
}
}
}
public Subject<ListenerMessage> getMessageSubject() {
return messageSubject;
}
@Test
public void testTimeIntervalObservable() throws InterruptedException {
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.timeInterval()
.subscribe(new Subject<Timed<Long>>() {
@Override
public boolean hasObservers() {
return false;
}
@Override
public boolean hasThrowable() {
return false;
}
@Override
public boolean hasComplete() {
return false;
}
@Override
public Throwable getThrowable() {
return null;
}
@Override
protected void subscribeActual(Observer<? super Timed<Long>> observer) {
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Timed<Long> longTimed) {
System.out.println("onNext: " + longTimed);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(3000);
}
@Override
public Subject<RxPayload> getEventSubject() {
return mEventEmitter;
}
@Override
public Subject<RxPayload> getEventSubject() {
return mEventEmitter;
}
public Subject<RxPayload> getEventSubject() {
return mEventEmitter;
}
private void testRx2(){
OKHttpUtilsRx2.INSTANCE.getApi().getGankAndroid()
.compose(OKHttpUtilsRx2.INSTANCE.<GankAndroid>IoMain())
.subscribe(new Subject<GankAndroid>() {
@Override
public boolean hasObservers() {
return false;
}
@Override
public boolean hasThrowable() {
return false;
}
@Override
public boolean hasComplete() {
return false;
}
@Override
public Throwable getThrowable() {
return null;
}
@Override
protected void subscribeActual(Observer<? super GankAndroid> observer) {
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(GankAndroid gankAndroid) {
mTextView.setText(JSON.toJSONString(gankAndroid));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onPerformSync(Account account, Bundle extras, String authority, ContentProviderClient providerClient, SyncResult syncResult) {
try {
final MovirtAccount movirtAccount = accountManagerHelper.asMoAccount(account);
final AccountEnvironment environment = environmentStore.getEnvironment(movirtAccount);
if (environment.isLoginInProgress()) {
// sync will be called again if the login succeeds
return;
}
// onPerformSync calls are guaranteed to be serialized for the same account
// so progress is atomic
setSyncInProgress(movirtAccount, true);
// remember last used sync action, because we may try it again if it fails
final Subject<SyncAction> syncActions = BehaviorSubject.createDefault(SyncAction.getFirstAction());
final Observable<LoginStatus> loginStatus = rxStore.isLoginInProgressObservable(movirtAccount)
.observeOn(Schedulers.newThread());
Observable.combineLatest(syncActions, loginStatus, SyncBundle::new)
.doOnNext(syncBundle -> { // sync
if (syncBundle.action == SyncAction.EVENT
&& !environment.getSharedPreferencesHelper().isPollEventsEnabled()) {
return;
}
environment.getFacade(syncBundle.action.getClazz()).syncAllUnsafe();
})
.retryWhen(errors ->
// run again with another MAX_SYNC_ERRORS tries; + 1 is for signaling the last error
errors.zipWith(Observable.range(1, MAX_SYNC_ERRORS + 1), Pair::new)
.flatMap(err -> {
if (err.second == MAX_SYNC_ERRORS + 1 // last try failed
// or unrecoverable exception
|| ((err.first instanceof RestCallException) && !((RestCallException) err.first).isRepeatable())) {
return Observable.<Long>error(err.first); // cancel the sync
}
// wait few seconds before trying again
Log.d(TAG, String.format("Account %s: failed sync. Retrying...", movirtAccount.getName()));
return Observable.timer(WAIT_BEFORE_NEXT_TRY, TimeUnit.SECONDS);
}))
.doFinally(() -> setSyncInProgress(movirtAccount, false))
// finish if there is no reason to continue syncing
.takeWhile(SyncBundle::isNotFinished)
// block onPerformSync method -> the sync status will be atomic
.blockingSubscribe(syncBundle -> syncActions.onNext(syncBundle.action.getNextAction()), // continue sync with next action
throwable -> {
// android can interrupt us while we sleep, probably because the same sync is pending; so ignore this one
if (!(throwable instanceof InterruptedException)) {
// if not, first real error is handled (depends on MAX_SYNC_ERRORS)
environment.getRestErrorHandler().handleError(throwable, "Sync failed. ");
}
});
} catch (AccountDeletedException | IllegalStateException ignore) {
}
}
@NonNull
@Override
public final Subject<ActivityEvent> provideLifecycleSubject() {
return mLifecycleSubject;
}
@NonNull
@Override
public final Subject<FragmentEvent> provideLifecycleSubject() {
return mLifecycleSubject;
}