类io.reactivex.subjects.Subject源码实例Demo

下面列出了怎么用io.reactivex.subjects.Subject的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: jobson   文件: JobsDAOTest.java
@Test
public void testPersistStdoutReturnsADisposableThatStopsFurtherReads() {
    final JobDAO dao = getInstance();
    final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
    final Subject<byte[]> stdoutSubject = PublishSubject.create();
    final AtomicBoolean stdoutObsWasRead = new AtomicBoolean(false);
    final Observable<byte[]> stdoutObs = stdoutSubject.map(data -> {
        stdoutObsWasRead.set(true);
        return data;
    });

    final Disposable disposable = dao.appendStdout(jobId, stdoutObs);
    disposable.dispose();
    stdoutSubject.onNext(TestHelpers.generateRandomBytes());

    assertThat(stdoutObsWasRead.get());
}
 
源代码2 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteStderrListenerIsCompletedOnceApplicationExecutionEnds() throws Throwable {
    final JobExecutor jobExecutor = getInstance();
    final AtomicBoolean completedCalled = new AtomicBoolean(false);
    final Subject<byte[]> stderrSubject = PublishSubject.create();
    stderrSubject.doOnComplete(() -> completedCalled.set(true)).subscribe();
    final JobEventListeners listeners = createStderrListener(stderrSubject);
    final CancelablePromise<JobExecutionResult> ret =
            jobExecutor.execute(STANDARD_REQUEST, listeners);

    promiseAssert(ret, result -> {
        try {
            // The stderr thread can race with the exit thread
            Thread.sleep(50);
            assertThat(completedCalled.get()).isTrue();
        } catch (InterruptedException ignored) {}
    });
}
 
源代码3 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteEvaluatesJobInputsAsExpected() throws InterruptedException {
    final JobExecutor jobExecutor = getInstance();
    final PersistedJob req =
            standardRequestWithCommand("echo", "${inputs.foo}");
    final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
    final Subject<byte[]> stdoutSubject = PublishSubject.create();

    stdoutSubject.subscribe(bytes ->
            bytesEchoedToStdout.getAndUpdate(existingBytes ->
                    Bytes.concat(existingBytes, bytes)));

    final Semaphore s = new Semaphore(1);
    s.acquire();
    stdoutSubject.doOnComplete(s::release).subscribe();

    final JobEventListeners listeners =
            createStdoutListener(stdoutSubject);

    jobExecutor.execute(req, listeners);

    s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);

    final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
    assertThat(stringFromStdout).isEqualTo("a"); // from spec
}
 
源代码4 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteEvaluatesJoinAsExpected() throws InterruptedException {
    final JobExecutor jobExecutor = getInstance();
    final PersistedJob req =
            standardRequestWithCommand("echo", "${join(',', inputs.someList)}");
    final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
    final Subject<byte[]> stdoutSubject = PublishSubject.create();
    stdoutSubject.subscribe(bytes ->
            bytesEchoedToStdout.getAndUpdate(existingBytes ->
                    Bytes.concat(existingBytes, bytes)));

    final Semaphore s = new Semaphore(1);
    s.acquire();
    stdoutSubject.doOnComplete(s::release).subscribe();

    final JobEventListeners listeners =
            createStdoutListener(stdoutSubject);

    jobExecutor.execute(req, listeners);

    s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);

    final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();

    assertThat(stringFromStdout).isEqualTo("a,b,c,d"); // From the input fixture
}
 
源代码5 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteEvaluatesToStringAsExpected() throws InterruptedException {
    final JobExecutor jobExecutor = getInstance();
    final PersistedJob req =
            standardRequestWithCommand("echo", "${toString(inputs.someString)}");
    final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
    final Subject<byte[]> stdoutSubject = PublishSubject.create();
    stdoutSubject.subscribe(bytes ->
            bytesEchoedToStdout.getAndUpdate(existingBytes ->
                    Bytes.concat(existingBytes, bytes)));

    final Semaphore s = new Semaphore(1);
    s.acquire();
    stdoutSubject.doOnComplete(s::release).subscribe();

    final JobEventListeners listeners =
            createStdoutListener(stdoutSubject);

    jobExecutor.execute(req, listeners);

    s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);

    final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();

    assertThat(stringFromStdout).isEqualTo("hello, world!"); // from input fixture
}
 
源代码6 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteEvaluatesOutputDirAsExpected() throws InterruptedException {
    final JobExecutor jobExecutor = getInstance();
    final PersistedJob req =
            standardRequestWithCommand("echo", "${outputDir}");
    final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
    final Subject<byte[]> stdoutSubject = PublishSubject.create();
    stdoutSubject.subscribe(bytes ->
            bytesEchoedToStdout.getAndUpdate(existingBytes ->
                    Bytes.concat(existingBytes, bytes)));

    final Semaphore s = new Semaphore(1);
    s.acquire();
    stdoutSubject.doOnComplete(s::release).subscribe();

    final JobEventListeners listeners =
            createStdoutListener(stdoutSubject);

    jobExecutor.execute(req, listeners);

    s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);

    final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();

    assertThat(Files.exists(Paths.get(stringFromStdout)));
}
 
源代码7 项目: jobson   文件: JobManagerTest.java
@Test
public void testGetStderrUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException {
    final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>();
    final Subject<byte[]> stderrSubject = PublishSubject.create();
    final JobExecutor executor =
            MockJobExecutor.thatUses(executorPromise, Observable.just(TestHelpers.generateRandomBytes()), stderrSubject);
    final JobManager jobManager = createManagerWith(executor);

    final Pair<JobId, CancelablePromise<FinalizedJob>> ret =
            jobManager.submit(STANDARD_VALID_REQUEST);

    final Observable<byte[]> stderrObservable =
            jobManager.stderrUpdates(ret.getLeft()).get();

    final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>();
    stderrObservable.subscribe(bytesFromObservable::set);

    final byte[] bytesExpected = TestHelpers.generateRandomBytes();
    stderrSubject.onNext(bytesExpected);

    executorPromise.complete(new JobExecutionResult(FINISHED));

    ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS);

    assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected);
}
 
源代码8 项目: Reactive-Android-Programming   文件: Sandbox.java
private static void demo2() {
    Subject<Long> subject = PublishSubject.create();

    Observable.interval(2, TimeUnit.SECONDS)
            .take(3)
            .doOnComplete(() -> log("Origin-One-doOnComplete"))
            .subscribe(subject);

    Observable.interval(1, TimeUnit.SECONDS)
            .take(2)
            .doOnComplete(() -> log("Origin-Two-doOnComplete"))
            .subscribe(subject);

    subject
            .doOnComplete(() -> log("First-doOnComplete"))
            .subscribe(v -> log(v));
}
 
源代码9 项目: Reactive-Android-Programming   文件: Sandbox.java
private static void demo1() throws InterruptedException {
    Subject<Long> subject = PublishSubject.create();

    Observable.interval(2, TimeUnit.SECONDS)
            .take(5)
            .doOnSubscribe((d) -> log("Original-doOnSubscribe"))
            .doOnComplete(() -> log("Original-doOnComplete"))
            .subscribe(subject);

    subject
            .doOnSubscribe((d) -> log("First-doOnSubscribe"))
            .doOnComplete(() -> log("First-doOnComplete"))
            .subscribe(v -> log("First: " + v));

    Thread.sleep(4100);

    subject
            .doOnSubscribe((d) -> log("Second-doOnSubscribe"))
            .doOnComplete(() -> log("Second-doOnComplete"))
            .subscribe(v -> log("Second: " + v));

}
 
源代码10 项目: quill   文件: AuthService.java
private Observable<JsonElement> revokeToken(AuthToken token, String clientSecret) {
    // this complexity exists because the access token must be revoked AFTER the refresh token
    // why? because the access token is needed for both revocations!
    Subject<JsonElement> responses = PublishSubject.create();
    RevokeReqBody refreshReqBody = RevokeReqBody.fromRefreshToken(
            token.getRefreshToken(), clientSecret);
    revokeSingleToken(token.getAuthHeader(), refreshReqBody, responses)
            .doOnComplete(() -> {
                RevokeReqBody accessReqBody = RevokeReqBody.fromAccessToken(
                        token.getAccessToken(), clientSecret);
                revokeSingleToken(token.getAuthHeader(), accessReqBody, responses)
                        .subscribe();
            })
            .subscribe();
    return responses;
}
 
源代码11 项目: t-io   文件: WsClientAioHander.java
@Override
public void handler(Packet packet, ChannelContext ctx) throws Exception {
  if (packet instanceof WsPacket) {
    WsPacket wsPacket = (WsPacket) packet;
    if (!wsPacket.isWsEof()) {
      return;
    }
  }
  Subject<Packet> packetPublisher =
      (Subject<Packet>) ctx.getAttribute(WebSocketImpl.packetPublisherKey);
  packetPublisher.onNext(packet);
}
 
源代码12 项目: jobson   文件: JobManager.java
private void advanceJobQueue() {
    final QueuedJob queuedJob = jobQueue.poll();

    if (queuedJob == null) return;

    final Subject<byte[]> stdout = PublishSubject.create();
    final Subject<byte[]> stderr = PublishSubject.create();

    jobDAO.appendStdout(queuedJob.getId(), stdout);
    jobDAO.appendStderr(queuedJob.getId(), stderr);

    stdout.subscribe(queuedJob.getQueuedListeners().getOnStdoutListener());
    stderr.subscribe(queuedJob.getQueuedListeners().getOnStderrListener());

    try {
        final CancelablePromise<JobExecutionResult> executionPromise =
                jobExecutor.execute(queuedJob, JobEventListeners.create(stdout, stderr));

        final ExecutingJob executingJob =
                ExecutingJob.fromQueuedJob(queuedJob, now(), stdout, stderr);

        executingJobs.put(executingJob.getId(), executingJob);

        updateJobStatus(queuedJob.getId(), RUNNING, "Submitted to executor");

        executionPromise.thenAccept(res -> {
            onExecutionFinished(executingJob, res);
        });

        executingJob.getCompletionPromise().onCancel(() -> {
            executionPromise.cancel(true);
        });
    } catch (Throwable ex) {
        log.error("Error starting job execution: " + ex.toString());
        updateJobStatus(queuedJob.getId(), FATAL_ERROR, "Error executing job: " + ex.toString());
    }
}
 
源代码13 项目: jobson   文件: JobsDAOTest.java
@Test
public void testPersistStdoutReadsDataFromObservable() {
    final JobDAO dao = getInstance();
    final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
    final Subject<byte[]> stdoutSubject = PublishSubject.create();
    final AtomicBoolean stdoutObsWasRead = new AtomicBoolean(false);
    final Observable<byte[]> stdoutObs = stdoutSubject.map(data -> {
        stdoutObsWasRead.set(true);
        return data;
    });

    dao.appendStdout(jobId, stdoutObs);

    assertThat(stdoutObsWasRead.get());
}
 
源代码14 项目: jobson   文件: JobsDAOTest.java
@Test
public void testPersistStderrReadsDataFromObservable() {
    final JobDAO dao = getInstance();
    final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId();
    final Subject<byte[]> stderrSubject = PublishSubject.create();
    final AtomicBoolean stderrObsWasRead = new AtomicBoolean(false);
    final Observable<byte[]> stderrObs = stderrSubject.map(data -> {
        stderrObsWasRead.set(true);
        return data;
    });

    dao.appendStderr(jobId, stderrObs);

    assertThat(stderrObsWasRead.get());
}
 
源代码15 项目: SweetMusicPlayer   文件: RxBusOlder.java
public void unregister(@NonNull Object tag, @NonNull Observable observable) {
    List<Subject> subjects = mSubjectsMapper.get(tag);
    if (subjects != null) {
        subjects.remove(observable);
        if (subjects.isEmpty()) {
            mSubjectsMapper.remove(tag);
        }
        if (DEBUG) {
            Timber.d("[unregister] mSubjectsMapper: " + mSubjectsMapper);
        }
    }
}
 
源代码16 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteEvaluatesToFileAsExpected() throws InterruptedException, IOException {
    final JobExecutor jobExecutor = getInstance();
    final PersistedJob req =
            standardRequestWithCommand("echo", "${toFile(toJSON(inputs))}");
    final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
    final Subject<byte[]> stdoutSubject = PublishSubject.create();

    stdoutSubject.subscribe(bytes ->
            bytesEchoedToStdout.getAndUpdate(existingBytes ->
                    Bytes.concat(existingBytes, bytes)));

    final Semaphore s = new Semaphore(1);
    s.acquire();
    stdoutSubject.doOnComplete(s::release).subscribe();

    final JobEventListeners listeners =
            createStdoutListener(stdoutSubject);

    jobExecutor.execute(req, listeners);

    s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);

    final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();
    final Path p = Paths.get(stringFromStdout);

    assertThat(p.toFile().exists());

    final String loadedJson = new String(Files.readAllBytes(p));

    TestHelpers.assertJSONEqual(loadedJson, toJSON(STANDARD_REQUEST.getInputs()));
}
 
源代码17 项目: jobson   文件: JobManagerTest.java
@Test
public void testSubmitJobEventListenersEchoStdoutWhenExecutorEchoesStdout() throws InterruptedException {
    final Subject<byte[]> stdoutSubject = ReplaySubject.create();
    final byte[] expectedStdoutBytes = generateRandomBytes();
    stdoutSubject.onNext(expectedStdoutBytes);

    final JobExecutor jobExecutor = MockJobExecutor.thatUses(stdoutSubject, Observable.never());
    final JobManager jobManager = createManagerWith(jobExecutor);

    final Semaphore s = new Semaphore(1);
    s.acquire();

    final JobEventListeners listeners = JobEventListeners.createStdoutListener(new Observer<byte[]>() {
        @Override
        public void onSubscribe(@NonNull Disposable disposable) {}

        @Override
        public void onNext(@NonNull byte[] bytes) {
            assertThat(bytes).isEqualTo(expectedStdoutBytes);
            s.release();
        }

        @Override
        public void onError(@NonNull Throwable throwable) {
            fail("Error from observable");
            s.release();
       }

        @Override
        public void onComplete() {}
    });

    jobManager.submit(STANDARD_VALID_REQUEST, listeners);

    if (!s.tryAcquire(1, SECONDS)) {
        fail("Timed out before any bytes received");
    }
}
 
源代码18 项目: jobson   文件: JobManagerTest.java
@Test
public void testSubmitJobEventListenersEchoStderrWhenExecutorEchoesStderr() throws InterruptedException {
    final Subject<byte[]> stderr = ReplaySubject.create();
    final byte[] stderrBytes = generateRandomBytes();
    stderr.onNext(stderrBytes);

    final JobExecutor jobExecutor = MockJobExecutor.thatUses(Observable.never(), stderr);
    final JobManager jobManager = createManagerWith(jobExecutor);

    final Semaphore s = new Semaphore(1);
    s.acquire();

    final JobEventListeners listeners = JobEventListeners.createStderrListener(new Observer<byte[]>() {
        @Override
        public void onSubscribe(@NonNull Disposable disposable) {}

        @Override
        public void onNext(@NonNull byte[] bytes) {
            assertThat(bytes).isEqualTo(stderrBytes);
            s.release();
        }

        @Override
        public void onError(@NonNull Throwable throwable) {
            fail("Error from observable");
            s.release();
        }

        @Override
        public void onComplete() {}
    });

    jobManager.submit(STANDARD_VALID_REQUEST, listeners);

    if (!s.tryAcquire(1, SECONDS)) {
        fail("Timed out before any bytes received");
    }
}
 
源代码19 项目: Toutiao   文件: RxBus.java
@NonNull
public <T> Observable<T> register(@NonNull Object tag) {
    List<Subject> subjectList = subjectMapper.get(tag);
    if (null == subjectList) {
        subjectList = new ArrayList<>();
        subjectMapper.put(tag, subjectList);
    }

    Subject<T> subject = PublishSubject.create();
    subjectList.add(subject);

    //System.out.println("注册到rxbus");
    return subject;
}
 
源代码20 项目: Toutiao   文件: RxBus.java
public void unregister(@NonNull Object tag, @NonNull Observable observable) {
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjects.remove(observable);
        if (subjects.isEmpty()) {
            subjectMapper.remove(tag);
            //System.out.println("从rxbus取消注册");
        }
    }
}
 
源代码21 项目: Toutiao   文件: RxBus.java
public void post(@NonNull Object tag, @NonNull Object content) {
    List<Subject> subjects = subjectMapper.get(tag);
    if (!subjects.isEmpty()) {
        for (Subject subject : subjects) {
            subject.onNext(content);
        }
    }
}
 
源代码22 项目: symbol-sdk-java   文件: ListenerBase.java
public Subject<ListenerMessage> getMessageSubject() {
    return messageSubject;
}
 
源代码23 项目: RxAndroid-Sample   文件: ExampleUnitTest.java
@Test
public void testTimeIntervalObservable() throws InterruptedException {

    Observable.interval(100, TimeUnit.MILLISECONDS)
            .take(3)
            .timeInterval()
            .subscribe(new Subject<Timed<Long>>() {
                @Override
                public boolean hasObservers() {
                    return false;
                }

                @Override
                public boolean hasThrowable() {
                    return false;
                }

                @Override
                public boolean hasComplete() {
                    return false;
                }

                @Override
                public Throwable getThrowable() {
                    return null;
                }

                @Override
                protected void subscribeActual(Observer<? super Timed<Long>> observer) {

                }

                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    System.out.println("onNext: " + longTimed);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

    Thread.sleep(3000);
}
 
源代码24 项目: homeassist   文件: MainActivity.java
@Override
public Subject<RxPayload> getEventSubject() {
    return mEventEmitter;
}
 
源代码25 项目: homeassist   文件: TransparentActivity.java
@Override
public Subject<RxPayload> getEventSubject() {
    return mEventEmitter;
}
 
源代码26 项目: homeassist   文件: DataSyncService.java
public Subject<RxPayload> getEventSubject() {
    return mEventEmitter;
}
 
源代码27 项目: RetrofitCache   文件: MainActivity.java
private void testRx2(){
    OKHttpUtilsRx2.INSTANCE.getApi().getGankAndroid()
            .compose(OKHttpUtilsRx2.INSTANCE.<GankAndroid>IoMain())
            .subscribe(new Subject<GankAndroid>() {
                @Override
                public boolean hasObservers() {
                    return false;
                }

                @Override
                public boolean hasThrowable() {
                    return false;
                }

                @Override
                public boolean hasComplete() {
                    return false;
                }

                @Override
                public Throwable getThrowable() {
                    return null;
                }

                @Override
                protected void subscribeActual(Observer<? super GankAndroid> observer) {

                }

                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(GankAndroid gankAndroid) {
                    mTextView.setText(JSON.toJSONString(gankAndroid));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}
 
源代码28 项目: moVirt   文件: SyncAdapter.java
@Override
public void onPerformSync(Account account, Bundle extras, String authority, ContentProviderClient providerClient, SyncResult syncResult) {
    try {
        final MovirtAccount movirtAccount = accountManagerHelper.asMoAccount(account);
        final AccountEnvironment environment = environmentStore.getEnvironment(movirtAccount);

        if (environment.isLoginInProgress()) {
            // sync will be called again if the login succeeds
            return;
        }
        // onPerformSync calls are guaranteed to be serialized for the same account
        // so progress is atomic
        setSyncInProgress(movirtAccount, true);

        // remember last used sync action, because we may try it again if it fails
        final Subject<SyncAction> syncActions = BehaviorSubject.createDefault(SyncAction.getFirstAction());
        final Observable<LoginStatus> loginStatus = rxStore.isLoginInProgressObservable(movirtAccount)
                .observeOn(Schedulers.newThread());

        Observable.combineLatest(syncActions, loginStatus, SyncBundle::new)
                .doOnNext(syncBundle -> { // sync
                    if (syncBundle.action == SyncAction.EVENT
                            && !environment.getSharedPreferencesHelper().isPollEventsEnabled()) {
                        return;
                    }
                    environment.getFacade(syncBundle.action.getClazz()).syncAllUnsafe();
                })
                .retryWhen(errors ->
                        // run again with another MAX_SYNC_ERRORS tries; + 1 is for signaling the last error
                        errors.zipWith(Observable.range(1, MAX_SYNC_ERRORS + 1), Pair::new)
                                .flatMap(err -> {
                                    if (err.second == MAX_SYNC_ERRORS + 1 // last try failed
                                            // or unrecoverable exception
                                            || ((err.first instanceof RestCallException) && !((RestCallException) err.first).isRepeatable())) {
                                        return Observable.<Long>error(err.first); // cancel the sync
                                    }
                                    // wait few seconds before trying again
                                    Log.d(TAG, String.format("Account %s: failed sync. Retrying...", movirtAccount.getName()));
                                    return Observable.timer(WAIT_BEFORE_NEXT_TRY, TimeUnit.SECONDS);
                                }))
                .doFinally(() -> setSyncInProgress(movirtAccount, false))
                //  finish if there is no reason to continue syncing
                .takeWhile(SyncBundle::isNotFinished)
                // block onPerformSync method -> the sync status will be atomic
                .blockingSubscribe(syncBundle -> syncActions.onNext(syncBundle.action.getNextAction()), // continue sync with next action
                        throwable -> {
                            // android can interrupt us while we sleep, probably because the same sync is pending; so ignore this one
                            if (!(throwable instanceof InterruptedException)) {
                                // if not, first real error is handled (depends on MAX_SYNC_ERRORS)
                                environment.getRestErrorHandler().handleError(throwable, "Sync failed. ");
                            }
                        });
    } catch (AccountDeletedException | IllegalStateException ignore) {
    }
}
 
源代码29 项目: Aurora   文件: BaseActivity.java
@NonNull
@Override
public final Subject<ActivityEvent> provideLifecycleSubject() {
    return mLifecycleSubject;
}
 
源代码30 项目: Aurora   文件: BaseFragment.java
@NonNull
@Override
public final Subject<FragmentEvent> provideLifecycleSubject() {
    return mLifecycleSubject;
}
 
 类所在包
 同包方法