类io.reactivex.internal.functions.Functions源码实例Demo

下面列出了怎么用io.reactivex.internal.functions.Functions的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: vertx-in-action   文件: DashboardWebAppVerticle.java
private void hydrate() {
  WebClient webClient = WebClient.create(vertx);
  webClient
    .get(3001, "localhost", "/ranking-last-24-hours")
    .as(BodyCodec.jsonArray())
    .rxSend()
    .delay(5, TimeUnit.SECONDS, RxHelper.scheduler(vertx))
    .retry(5)
    .map(HttpResponse::body)
    .flattenAsFlowable(Functions.identity())
    .cast(JsonObject.class)
    .flatMapSingle(json -> whoOwnsDevice(webClient, json))
    .flatMapSingle(json -> fillWithUserProfile(webClient, json))
    .subscribe(
      this::hydrateEntryIfPublic,
      err -> logger.error("Hydratation error", err),
      () -> logger.info("Hydratation completed"));
}
 
源代码2 项目: RxAndroidBle   文件: ServiceDiscoveryManager.java
void reset() {
    hasCachedResults = false;
    this.deviceServicesObservable = getListOfServicesFromGatt()
            .map(wrapIntoRxBleDeviceServices())
            .switchIfEmpty(getTimeoutConfiguration().flatMap(scheduleActualDiscoveryWithTimeout()))
            .doOnSuccess(Functions.actionConsumer(new Action() {
                @Override
                public void run() {
                    hasCachedResults = true;
                }
            }))
            .doOnError(Functions.actionConsumer(new Action() {
                @Override
                public void run() {
                    reset();
                }
            }))
            .cache();
}
 
源代码3 项目: a   文件: MApplication.java
@Override
public void onCreate() {
    super.onCreate();
    instance = this;
    CrashHandler.getInstance().init(this);
    RxJavaPlugins.setErrorHandler(Functions.emptyConsumer());
    try {
        versionCode = getPackageManager().getPackageInfo(getPackageName(), 0).versionCode;
        versionName = getPackageManager().getPackageInfo(getPackageName(), 0).versionName;
    } catch (PackageManager.NameNotFoundException e) {
        versionCode = 0;
        versionName = "0.0.0";
    }
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
        createChannelId();
    }
    configPreferences = getSharedPreferences("CONFIG", 0);
    downloadPath = configPreferences.getString(getString(R.string.pk_download_path), "");
    if (TextUtils.isEmpty(downloadPath) | Objects.equals(downloadPath, FileHelp.getCachePath())) {
        setDownloadPath(null);
    }
    initNightTheme();
    if (!ThemeStore.isConfigured(this, versionCode)) {
        upThemeStore();
    }
    AppFrontBackHelper.getInstance().register(this, new AppFrontBackHelper.OnAppStatusListener() {
        @Override
        public void onFront() {
            //donateHb = System.currentTimeMillis() - configPreferences.getLong("DonateHb", 0) <= TimeUnit.DAYS.toMillis(7);
        }

        @Override
        public void onBack() {
            UpLastChapterModel.destroy();
        }
    });
    upEInkMode();
}
 
源代码4 项目: RxBus   文件: FlowableUtils.java
public static <T> Disposable subscribe(Flowable<T> flowable,
                                       Consumer<? super T> onNext,
                                       Consumer<? super Throwable> onError) {
    return subscribe(flowable,
            onNext, onError,
            Functions.EMPTY_ACTION,
            FlowableInternalHelper.RequestMax.INSTANCE
    );
}
 
private void showMovieDetails(final int id) {
    final MovieDetailsViewModel movie = viewModelFactory.createMovieDetailsViewModelById(id);
    if (movie == null) {
        return;
    }

    binding.setMovie(movie);
    compositeDisposable.add(movie.loadDetails().subscribe(Functions.EMPTY_ACTION,
            err -> { // onError
                Log.w(TAG, "showMovieDetails onError: " + err.toString());
                Toast.makeText(this, R.string.error_fetch_movie_details, LENGTH_LONG).show();
            }));

    loadSimilarMovies(movie);
}
 
源代码6 项目: MyBookshelf   文件: MApplication.java
@Override
public void onCreate() {
    super.onCreate();
    instance = this;
    CrashHandler.getInstance().init(this);
    RxJavaPlugins.setErrorHandler(Functions.emptyConsumer());
    try {
        versionCode = getPackageManager().getPackageInfo(getPackageName(), 0).versionCode;
        versionName = getPackageManager().getPackageInfo(getPackageName(), 0).versionName;
    } catch (PackageManager.NameNotFoundException e) {
        versionCode = 0;
        versionName = "0.0.0";
    }
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
        createChannelId();
    }
    configPreferences = getSharedPreferences("CONFIG", 0);
    downloadPath = configPreferences.getString(getString(R.string.pk_download_path), "");
    if (TextUtils.isEmpty(downloadPath) | Objects.equals(downloadPath, FileHelp.getCachePath())) {
        setDownloadPath(null);
    }
    initNightTheme();
    if (!ThemeStore.isConfigured(this, versionCode)) {
        upThemeStore();
    }
    AppFrontBackHelper.getInstance().register(this, new AppFrontBackHelper.OnAppStatusListener() {
        @Override
        public void onFront() {
            donateHb = System.currentTimeMillis() - configPreferences.getLong("DonateHb", 0) <= TimeUnit.DAYS.toMillis(30);
        }

        @Override
        public void onBack() {
            UpLastChapterModel.destroy();
        }
    });
    upEInkMode();
}
 
源代码7 项目: RxAndroidBle   文件: DisconnectAction.java
@Override
public void onConnectionUnsubscribed() {
    clientOperationQueue
            .queue(operationDisconnect)
            .subscribe(
                    Functions.emptyConsumer(),
                    Functions.emptyConsumer()
            );
}
 
源代码8 项目: RxAndroidBle   文件: RxBleConnectionMock.java
private void dismissCharacteristicNotification(UUID characteristicUuid, NotificationSetupMode setupMode, boolean isIndication) {
    notificationObservableMap.remove(characteristicUuid);
    setupCharacteristicNotification(characteristicUuid, setupMode, false, isIndication)
            .subscribe(
                    Functions.EMPTY_ACTION,
                    Functions.emptyConsumer()
            );
}
 
源代码9 项目: rxjava2-extras   文件: RetryWhen.java
private static Function<ErrorAndDuration, Flowable<ErrorAndDuration>> delay(final Scheduler scheduler) {
    return new Function<ErrorAndDuration, Flowable<ErrorAndDuration>>() {
        @Override
        public Flowable<ErrorAndDuration> apply(ErrorAndDuration e) {
            if (e.durationMs() == NO_MORE_DELAYS)
                return Flowable.error(e.throwable());
            else
                return Flowable.timer(e.durationMs(), TimeUnit.MILLISECONDS, scheduler)
                        .map(com.github.davidmoten.rx2.Functions.constant(e));
        }
    };
}
 
源代码10 项目: rxjava2-extras   文件: FlowableServerSocketTest.java
@Test
public void testAcceptSocketRejectsAlways() throws UnknownHostException, IOException, InterruptedException {
    reset();
    TestSubscriber<Object> ts = TestSubscriber.create();
    try {
        int bufferSize = 4;
        AtomicInteger port = new AtomicInteger();
        IO.serverSocketAutoAllocatePort(Consumers.set(port)) //
                .readTimeoutMs(10000) //
                .acceptTimeoutMs(200) //
                .bufferSize(bufferSize) //
                .acceptSocketIf(Functions.alwaysFalse()) //
                .create() //
                .subscribeOn(scheduler) //
                .subscribe(ts);
        Thread.sleep(300);
        Socket socket = new Socket(LOCALHOST, port.get());
        OutputStream out = socket.getOutputStream();
        out.write("12345678901234567890".getBytes());
        out.close();
        socket.close();
        Thread.sleep(1000);
        ts.assertNoValues();
    } finally {
        // will close server socket
        ts.dispose();
    }
}
 
源代码11 项目: state-machine   文件: Processor.java
@SuppressWarnings({ "rawtypes", "unchecked" })
public Flowable<EntityStateMachine<?, Id>> flowable() {
    return Flowable.defer(() -> {
        Worker worker = signalScheduler.createWorker();
        Flowable<Signal<?, Id>> o0 = subject //
                .toSerialized() //
                .toFlowable(BackpressureStrategy.BUFFER) //
                .mergeWith(signals) //
                .doOnCancel(() -> worker.dispose()) //
                .compose(preGroupBy);
        Flowable<GroupedFlowable<ClassId<?, Id>, Signal<?, Id>>> o;
        if (mapFactory != null) {
            o = o0.groupBy(signal -> new ClassId(signal.cls(),
             signal.id()), x -> x, true, 16, mapFactory);
        } else {
            o = o0.groupBy(signal -> new ClassId(signal.cls(), signal.id()),
                    Functions.identity());
        }
        return o.flatMap(g -> {
            Flowable<EntityStateMachine<?, Id>> obs = g //
                    .flatMap(processSignalsToSelfAndSendSignalsToOthers(worker, g.getKey())) //
                    .doOnNext(m -> stateMachines.put(g.getKey(), m)) //
                    .subscribeOn(processingScheduler); //

            Flowable<EntityStateMachine<?, Id>> res = entityTransform
                    .apply(grouped(g.getKey(), obs));
            return res;
        });
    });
}
 
源代码12 项目: RxAndroidAudio   文件: FileActivity.java
@OnClick(R.id.mBtnPlay)
public void startPlay() {
    mTvLog.setText("");
    if (!mAudioFiles.isEmpty()) {
        File audioFile = mAudioFiles.poll();
        compositeDisposable.add(mRxAudioPlayer.play(
                PlayConfig.file(audioFile)
                        .streamType(AudioManager.STREAM_VOICE_CALL)
                        .build())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(Functions.emptyConsumer(), Throwable::printStackTrace,
                        this::startPlay));
    }
}
 
源代码13 项目: rxjava-RxLife   文件: ObservableLife.java
@Override
public final Disposable subscribe() {
    return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
 
源代码14 项目: rxjava-RxLife   文件: ObservableLife.java
public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
 
源代码15 项目: rxjava-RxLife   文件: ObservableLife.java
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
    return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
 
源代码16 项目: rxjava-RxLife   文件: ObservableLife.java
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                                  Action onComplete) {
    return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
}
 
源代码17 项目: rxjava-RxLife   文件: SingleLife.java
@Override
public final Disposable subscribe() {
    return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING);
}
 
源代码18 项目: rxjava-RxLife   文件: SingleLife.java
public final Disposable subscribe(Consumer<? super T> onSuccess) {
    return subscribe(onSuccess, Functions.ON_ERROR_MISSING);
}
 
源代码19 项目: rxjava-RxLife   文件: FlowableLife.java
@Override
public final Disposable subscribe() {
    return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
            Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
 
源代码20 项目: rxjava-RxLife   文件: FlowableLife.java
public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING,
            Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
 
源代码21 项目: rxjava-RxLife   文件: FlowableLife.java
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
    return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
 
源代码22 项目: rxjava-RxLife   文件: MaybeLife.java
@Override
public final Disposable subscribe() {
    return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
 
源代码23 项目: rxjava-RxLife   文件: MaybeLife.java
public final Disposable subscribe(Consumer<? super T> onSuccess) {
    return subscribe(onSuccess, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
 
源代码24 项目: rxjava-RxLife   文件: MaybeLife.java
public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
    return subscribe(onSuccess, onError, Functions.EMPTY_ACTION);
}
 
源代码25 项目: RxBus   文件: MyLambdaSubscriber.java
@Override
public boolean hasCustomOnError() {
    return onError != Functions.ON_ERROR_MISSING;
}
 
源代码26 项目: MiPushFramework   文件: CheckPermissionsUtils.java
public static Disposable checkPermissionsAndStartAsync (@NonNull FragmentActivity context) {
    return checkPermissionsAndStartAsync(context, Functions.emptyConsumer(), Functions.ON_ERROR_MISSING);
}
 
源代码27 项目: MiPushFramework   文件: CheckPermissionsUtils.java
public static Disposable checkPermissionsAndStartAsync (@NonNull FragmentActivity context, @NonNull Consumer<ActivityResultAndPermissionResult> onNext) {
    return checkPermissionsAndStartAsync(context, onNext, Functions.ON_ERROR_MISSING);
}
 
Observable<Observable<byte[]>> setupServerInitiatedCharacteristicRead(
        @NonNull final BluetoothGattCharacteristic characteristic, final NotificationSetupMode setupMode, final boolean isIndication
) {
    return Observable.defer(new Callable<ObservableSource<Observable<byte[]>>>() {
        @Override
        public ObservableSource<Observable<byte[]>> call() {
            synchronized (activeNotificationObservableMap) {
                final CharacteristicNotificationId id
                        = new CharacteristicNotificationId(characteristic.getUuid(), characteristic.getInstanceId());

                final ActiveCharacteristicNotification activeCharacteristicNotification = activeNotificationObservableMap.get(id);

                if (activeCharacteristicNotification != null) {
                    if (activeCharacteristicNotification.isIndication == isIndication) {
                        return activeCharacteristicNotification.notificationObservable;
                    } else {
                        return Observable.error(
                                new BleConflictingNotificationAlreadySetException(characteristic.getUuid(), !isIndication)
                        );
                    }
                }

                final byte[] enableNotificationTypeValue = isIndication ? configEnableIndication : configEnableNotification;
                final PublishSubject<?> notificationCompletedSubject = PublishSubject.create();

                final Observable<Observable<byte[]>> newObservable = setCharacteristicNotification(bluetoothGatt, characteristic, true)
                        .andThen(ObservableUtil.justOnNext(observeOnCharacteristicChangeCallbacks(gattCallback, id)))
                        .compose(setupModeTransformer(descriptorWriter, characteristic, enableNotificationTypeValue, setupMode))
                        .map(new Function<Observable<byte[]>, Observable<byte[]>>() {
                            @Override
                            public Observable<byte[]> apply(Observable<byte[]> observable) {
                                return Observable.amb(Arrays.asList(
                                        notificationCompletedSubject.cast(byte[].class),
                                        observable.takeUntil(notificationCompletedSubject)
                                ));
                            }
                        })
                        .doFinally(new Action() {
                            @SuppressLint("CheckResult")
                            @Override
                            public void run() {
                                notificationCompletedSubject.onComplete();
                                synchronized (activeNotificationObservableMap) {
                                    activeNotificationObservableMap.remove(id);
                                }
                                // teardown the notification — subscription and result are ignored
                                setCharacteristicNotification(bluetoothGatt, characteristic, false)
                                        .compose(teardownModeTransformer(descriptorWriter, characteristic, configDisable, setupMode))
                                        .subscribe(
                                                Functions.EMPTY_ACTION,
                                                Functions.emptyConsumer()
                                        );
                            }
                        })
                        .mergeWith(gattCallback.<Observable<byte[]>>observeDisconnect())
                        .replay(1)
                        .refCount();
                activeNotificationObservableMap.put(id, new ActiveCharacteristicNotification(newObservable, isIndication));
                return newObservable;
            }
        }
    });
}
 
源代码29 项目: RxAndroidBle   文件: MtuWatcher.java
@Override
public void onConnectionSubscribed() {
    serialSubscription.set(mtuObservable.subscribe(this,
            // ignoring error, it is expected when the connection is lost.
            Functions.emptyConsumer()));
}
 
源代码30 项目: state-machine   文件: Processor.java
private Function<? super Signal<?, Id>, Flowable<EntityStateMachine<?, Id>>> processSignalsToSelfAndSendSignalsToOthers(
        Worker worker, ClassId<?, Id> classId) {
    return signal -> process(classId, signal.event(), worker) //
            .toList() //
            .toFlowable().flatMapIterable(Functions.identity());
}
 
 类所在包
 类方法
 同包方法