下面列出了怎么用io.reactivex.internal.functions.Functions的API类实例代码及写法,或者点击链接到github查看源代码。
private void hydrate() {
WebClient webClient = WebClient.create(vertx);
webClient
.get(3001, "localhost", "/ranking-last-24-hours")
.as(BodyCodec.jsonArray())
.rxSend()
.delay(5, TimeUnit.SECONDS, RxHelper.scheduler(vertx))
.retry(5)
.map(HttpResponse::body)
.flattenAsFlowable(Functions.identity())
.cast(JsonObject.class)
.flatMapSingle(json -> whoOwnsDevice(webClient, json))
.flatMapSingle(json -> fillWithUserProfile(webClient, json))
.subscribe(
this::hydrateEntryIfPublic,
err -> logger.error("Hydratation error", err),
() -> logger.info("Hydratation completed"));
}
void reset() {
hasCachedResults = false;
this.deviceServicesObservable = getListOfServicesFromGatt()
.map(wrapIntoRxBleDeviceServices())
.switchIfEmpty(getTimeoutConfiguration().flatMap(scheduleActualDiscoveryWithTimeout()))
.doOnSuccess(Functions.actionConsumer(new Action() {
@Override
public void run() {
hasCachedResults = true;
}
}))
.doOnError(Functions.actionConsumer(new Action() {
@Override
public void run() {
reset();
}
}))
.cache();
}
@Override
public void onCreate() {
super.onCreate();
instance = this;
CrashHandler.getInstance().init(this);
RxJavaPlugins.setErrorHandler(Functions.emptyConsumer());
try {
versionCode = getPackageManager().getPackageInfo(getPackageName(), 0).versionCode;
versionName = getPackageManager().getPackageInfo(getPackageName(), 0).versionName;
} catch (PackageManager.NameNotFoundException e) {
versionCode = 0;
versionName = "0.0.0";
}
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
createChannelId();
}
configPreferences = getSharedPreferences("CONFIG", 0);
downloadPath = configPreferences.getString(getString(R.string.pk_download_path), "");
if (TextUtils.isEmpty(downloadPath) | Objects.equals(downloadPath, FileHelp.getCachePath())) {
setDownloadPath(null);
}
initNightTheme();
if (!ThemeStore.isConfigured(this, versionCode)) {
upThemeStore();
}
AppFrontBackHelper.getInstance().register(this, new AppFrontBackHelper.OnAppStatusListener() {
@Override
public void onFront() {
//donateHb = System.currentTimeMillis() - configPreferences.getLong("DonateHb", 0) <= TimeUnit.DAYS.toMillis(7);
}
@Override
public void onBack() {
UpLastChapterModel.destroy();
}
});
upEInkMode();
}
public static <T> Disposable subscribe(Flowable<T> flowable,
Consumer<? super T> onNext,
Consumer<? super Throwable> onError) {
return subscribe(flowable,
onNext, onError,
Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE
);
}
private void showMovieDetails(final int id) {
final MovieDetailsViewModel movie = viewModelFactory.createMovieDetailsViewModelById(id);
if (movie == null) {
return;
}
binding.setMovie(movie);
compositeDisposable.add(movie.loadDetails().subscribe(Functions.EMPTY_ACTION,
err -> { // onError
Log.w(TAG, "showMovieDetails onError: " + err.toString());
Toast.makeText(this, R.string.error_fetch_movie_details, LENGTH_LONG).show();
}));
loadSimilarMovies(movie);
}
@Override
public void onCreate() {
super.onCreate();
instance = this;
CrashHandler.getInstance().init(this);
RxJavaPlugins.setErrorHandler(Functions.emptyConsumer());
try {
versionCode = getPackageManager().getPackageInfo(getPackageName(), 0).versionCode;
versionName = getPackageManager().getPackageInfo(getPackageName(), 0).versionName;
} catch (PackageManager.NameNotFoundException e) {
versionCode = 0;
versionName = "0.0.0";
}
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
createChannelId();
}
configPreferences = getSharedPreferences("CONFIG", 0);
downloadPath = configPreferences.getString(getString(R.string.pk_download_path), "");
if (TextUtils.isEmpty(downloadPath) | Objects.equals(downloadPath, FileHelp.getCachePath())) {
setDownloadPath(null);
}
initNightTheme();
if (!ThemeStore.isConfigured(this, versionCode)) {
upThemeStore();
}
AppFrontBackHelper.getInstance().register(this, new AppFrontBackHelper.OnAppStatusListener() {
@Override
public void onFront() {
donateHb = System.currentTimeMillis() - configPreferences.getLong("DonateHb", 0) <= TimeUnit.DAYS.toMillis(30);
}
@Override
public void onBack() {
UpLastChapterModel.destroy();
}
});
upEInkMode();
}
@Override
public void onConnectionUnsubscribed() {
clientOperationQueue
.queue(operationDisconnect)
.subscribe(
Functions.emptyConsumer(),
Functions.emptyConsumer()
);
}
private void dismissCharacteristicNotification(UUID characteristicUuid, NotificationSetupMode setupMode, boolean isIndication) {
notificationObservableMap.remove(characteristicUuid);
setupCharacteristicNotification(characteristicUuid, setupMode, false, isIndication)
.subscribe(
Functions.EMPTY_ACTION,
Functions.emptyConsumer()
);
}
private static Function<ErrorAndDuration, Flowable<ErrorAndDuration>> delay(final Scheduler scheduler) {
return new Function<ErrorAndDuration, Flowable<ErrorAndDuration>>() {
@Override
public Flowable<ErrorAndDuration> apply(ErrorAndDuration e) {
if (e.durationMs() == NO_MORE_DELAYS)
return Flowable.error(e.throwable());
else
return Flowable.timer(e.durationMs(), TimeUnit.MILLISECONDS, scheduler)
.map(com.github.davidmoten.rx2.Functions.constant(e));
}
};
}
@Test
public void testAcceptSocketRejectsAlways() throws UnknownHostException, IOException, InterruptedException {
reset();
TestSubscriber<Object> ts = TestSubscriber.create();
try {
int bufferSize = 4;
AtomicInteger port = new AtomicInteger();
IO.serverSocketAutoAllocatePort(Consumers.set(port)) //
.readTimeoutMs(10000) //
.acceptTimeoutMs(200) //
.bufferSize(bufferSize) //
.acceptSocketIf(Functions.alwaysFalse()) //
.create() //
.subscribeOn(scheduler) //
.subscribe(ts);
Thread.sleep(300);
Socket socket = new Socket(LOCALHOST, port.get());
OutputStream out = socket.getOutputStream();
out.write("12345678901234567890".getBytes());
out.close();
socket.close();
Thread.sleep(1000);
ts.assertNoValues();
} finally {
// will close server socket
ts.dispose();
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Flowable<EntityStateMachine<?, Id>> flowable() {
return Flowable.defer(() -> {
Worker worker = signalScheduler.createWorker();
Flowable<Signal<?, Id>> o0 = subject //
.toSerialized() //
.toFlowable(BackpressureStrategy.BUFFER) //
.mergeWith(signals) //
.doOnCancel(() -> worker.dispose()) //
.compose(preGroupBy);
Flowable<GroupedFlowable<ClassId<?, Id>, Signal<?, Id>>> o;
if (mapFactory != null) {
o = o0.groupBy(signal -> new ClassId(signal.cls(),
signal.id()), x -> x, true, 16, mapFactory);
} else {
o = o0.groupBy(signal -> new ClassId(signal.cls(), signal.id()),
Functions.identity());
}
return o.flatMap(g -> {
Flowable<EntityStateMachine<?, Id>> obs = g //
.flatMap(processSignalsToSelfAndSendSignalsToOthers(worker, g.getKey())) //
.doOnNext(m -> stateMachines.put(g.getKey(), m)) //
.subscribeOn(processingScheduler); //
Flowable<EntityStateMachine<?, Id>> res = entityTransform
.apply(grouped(g.getKey(), obs));
return res;
});
});
}
@OnClick(R.id.mBtnPlay)
public void startPlay() {
mTvLog.setText("");
if (!mAudioFiles.isEmpty()) {
File audioFile = mAudioFiles.poll();
compositeDisposable.add(mRxAudioPlayer.play(
PlayConfig.file(audioFile)
.streamType(AudioManager.STREAM_VOICE_CALL)
.build())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(Functions.emptyConsumer(), Throwable::printStackTrace,
this::startPlay));
}
}
@Override
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
}
@Override
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING);
}
public final Disposable subscribe(Consumer<? super T> onSuccess) {
return subscribe(onSuccess, Functions.ON_ERROR_MISSING);
}
@Override
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
@Override
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
public final Disposable subscribe(Consumer<? super T> onSuccess) {
return subscribe(onSuccess, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
return subscribe(onSuccess, onError, Functions.EMPTY_ACTION);
}
@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
public static Disposable checkPermissionsAndStartAsync (@NonNull FragmentActivity context) {
return checkPermissionsAndStartAsync(context, Functions.emptyConsumer(), Functions.ON_ERROR_MISSING);
}
public static Disposable checkPermissionsAndStartAsync (@NonNull FragmentActivity context, @NonNull Consumer<ActivityResultAndPermissionResult> onNext) {
return checkPermissionsAndStartAsync(context, onNext, Functions.ON_ERROR_MISSING);
}
Observable<Observable<byte[]>> setupServerInitiatedCharacteristicRead(
@NonNull final BluetoothGattCharacteristic characteristic, final NotificationSetupMode setupMode, final boolean isIndication
) {
return Observable.defer(new Callable<ObservableSource<Observable<byte[]>>>() {
@Override
public ObservableSource<Observable<byte[]>> call() {
synchronized (activeNotificationObservableMap) {
final CharacteristicNotificationId id
= new CharacteristicNotificationId(characteristic.getUuid(), characteristic.getInstanceId());
final ActiveCharacteristicNotification activeCharacteristicNotification = activeNotificationObservableMap.get(id);
if (activeCharacteristicNotification != null) {
if (activeCharacteristicNotification.isIndication == isIndication) {
return activeCharacteristicNotification.notificationObservable;
} else {
return Observable.error(
new BleConflictingNotificationAlreadySetException(characteristic.getUuid(), !isIndication)
);
}
}
final byte[] enableNotificationTypeValue = isIndication ? configEnableIndication : configEnableNotification;
final PublishSubject<?> notificationCompletedSubject = PublishSubject.create();
final Observable<Observable<byte[]>> newObservable = setCharacteristicNotification(bluetoothGatt, characteristic, true)
.andThen(ObservableUtil.justOnNext(observeOnCharacteristicChangeCallbacks(gattCallback, id)))
.compose(setupModeTransformer(descriptorWriter, characteristic, enableNotificationTypeValue, setupMode))
.map(new Function<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> apply(Observable<byte[]> observable) {
return Observable.amb(Arrays.asList(
notificationCompletedSubject.cast(byte[].class),
observable.takeUntil(notificationCompletedSubject)
));
}
})
.doFinally(new Action() {
@SuppressLint("CheckResult")
@Override
public void run() {
notificationCompletedSubject.onComplete();
synchronized (activeNotificationObservableMap) {
activeNotificationObservableMap.remove(id);
}
// teardown the notification — subscription and result are ignored
setCharacteristicNotification(bluetoothGatt, characteristic, false)
.compose(teardownModeTransformer(descriptorWriter, characteristic, configDisable, setupMode))
.subscribe(
Functions.EMPTY_ACTION,
Functions.emptyConsumer()
);
}
})
.mergeWith(gattCallback.<Observable<byte[]>>observeDisconnect())
.replay(1)
.refCount();
activeNotificationObservableMap.put(id, new ActiveCharacteristicNotification(newObservable, isIndication));
return newObservable;
}
}
});
}
@Override
public void onConnectionSubscribed() {
serialSubscription.set(mtuObservable.subscribe(this,
// ignoring error, it is expected when the connection is lost.
Functions.emptyConsumer()));
}
private Function<? super Signal<?, Id>, Flowable<EntityStateMachine<?, Id>>> processSignalsToSelfAndSendSignalsToOthers(
Worker worker, ClassId<?, Id> classId) {
return signal -> process(classId, signal.event(), worker) //
.toList() //
.toFlowable().flatMapIterable(Functions.identity());
}