下面列出了io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.disposables.Disposable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Create an observable from the given event source.
*
* @param eventSource the eventSource you want to convert to an observable
* @param <E> the event type
* @return an Observable based on the provided event source
*/
@NonNull
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
return Observable.create(
new ObservableOnSubscribe<E>() {
@Override
public void subscribe(@NonNull ObservableEmitter<E> emitter) throws Throwable {
final com.spotify.mobius.disposables.Disposable disposable =
eventSource.subscribe(
new com.spotify.mobius.functions.Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
}
});
}
});
}
@Override
public Observer<ReportEntity> getReportObserver() {
return new Observer<ReportEntity>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ReportEntity value) {
flush(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
@Override
public Observer<ReportEntity> getReportObserver() {
return new Observer<ReportEntity>() {
@Override
public void onSubscribe(Disposable d) {
start(d);
}
@Override
public void onNext(ReportEntity value) {
flush(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
@Override
public Observer<ReportEntity> getReportObserver() {
return new Observer<ReportEntity>() {
@Override
public void onSubscribe(Disposable d) {
disp = d;
}
@Override
public void onNext(ReportEntity value) {
entity = value;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
@Test
public void consumerTest(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final Disposable disposable = observable.subscribe(onNext);
logger.fine(String.valueOf(disposable));
assertEquals(5, result.size());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
private static <T>Observer<T> observer(final String name, final List<T> result) {
return new Observer<T>() {
@Override
public void onSubscribe(final Disposable d) {
}
@Override
public void onNext(final T next) {
logger.fine(name + ": " + next);
result.add(next);
}
@Override
public void onError(final Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
logger.fine(name + ": onComplete");
}
};
}
public Disposable getAndSort(Context context, final boolean sortByTime, final DateFormat dateFormat, KWSubscriber<ItemArray> subscriber) {
return Flowable.just(new WeakReference<>(context))
.flatMap(new Function<WeakReference<Context>, Publisher<List<App>>>() {
@Override
public Publisher<List<App>> apply(WeakReference<Context> weakContext) throws Exception {
List<App> apps = getApps();
if (apps == null) {
return getApps(weakContext.get(), dateFormat);
} else {
return Flowable.just(apps);
}
}
})
.map(new SortFunction(dateFormat, sortByTime))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
}
@Deprecated
public Disposable saveApk(App app, final String dest, final KWSubscriber<String> subscriber) {
return Flowable.just(app)
.map(new Function<App, String>() {
@Override
public String apply(App source) throws Exception {
String fileName = source.name + "_" + source.versionName + ".apk";
return FileUtil.copy(source.apkPath, dest, fileName, new OnCopyListener() {
@Override
public void inProgress(final float progress) {
mHandler.post(new Runnable() {
@Override
public void run() {
subscriber.inProgress(progress);
}
});
}
}).getAbsolutePath();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
}
public Disposable saveApk(ContentResolver resolver, App app, final Uri dest, final KWSubscriber<Uri> subscriber) {
return Flowable.just(app)
.map(new Function<App, Uri>() {
@Override
public Uri apply(App source) throws Exception {
FileUtil.copy(resolver, Uri.fromFile(new File(source.apkPath)), dest, new OnCopyListener() {
@Override
public void inProgress(final float progress) {
mHandler.post(new Runnable() {
@Override
public void run() {
subscriber.inProgress(progress);
}
});
}
});
return dest;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
}
@Test
public void testGetHelloWorldJson() throws Exception {
String address = "http://localhost:" + PORT + "/rx3/observable/textJson";
List<Object> providers = new LinkedList<>();
providers.add(new JacksonJsonProvider());
providers.add(new ObservableRxInvokerProvider());
WebClient wc = WebClient.create(address, providers);
Observable<HelloWorldBean> obs = wc.accept("application/json")
.rx(ObservableRxInvoker.class)
.get(HelloWorldBean.class);
Holder<HelloWorldBean> holder = new Holder<>();
Disposable d = obs.subscribe(v -> {
holder.value = v;
});
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
Thread.sleep(2000);
assertEquals("Hello", holder.value.getGreeting());
assertEquals("World", holder.value.getAudience());
}
/**
* Create an event source from the given RxJava streams.
*
* <p>All streams must be mapped to your event type.
*
* @param sources the observables you want to include in this event source
* @param <E> the event type
* @return an EventSource based on the provided observables
*/
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
final Observable<E> eventSource = Observable.mergeArray(sources);
return new EventSource<E>() {
@Nonnull
@Override
public com.spotify.mobius.disposables.Disposable subscribe(
com.spotify.mobius.functions.Consumer<E> eventConsumer) {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) throws Throwable {
eventConsumer.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
RxJavaPlugins.onError(error);
}
});
return new com.spotify.mobius.disposables.Disposable() {
@Override
public void dispose() {
disposable.dispose();
}
};
}
};
}
public static Object disposable() {
return new Disposable() {
@Override
public void dispose() {
}
@Override
public boolean isDisposed() {
return true;
}
};
}
/**
* 插入到观察者集合
*
* @param disposable
*/
public void addDisposable(Disposable disposable) {
if (disposable != null) {
if (mCompositeDisposable != null) {
mCompositeDisposable.add(disposable);
} else {
disposable.dispose();
}
}
}
public void removeDisposable(Disposable disposable) {
if (disposable != null) {
if (mCompositeDisposable != null) {
mCompositeDisposable.remove(disposable);
} else if (!disposable.isDisposed()) {
disposable.dispose();
}
}
}
public Disposable getApp(Context context, final Uri path, KWSubscriber<App> subscriber) {
return Flowable.just(new WeakReference<>(context))
.map(weakContext -> {
PackageManager pm = weakContext.get().getPackageManager();
String realPath;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q){
File file = new File(weakContext.get().getExternalCacheDir(), "temp.apk");
file.deleteOnExit();
realPath = file.getAbsolutePath();
FileUtil.copy(weakContext.get().getContentResolver(), path, Uri.fromFile(new File(realPath)), null);
}else {
realPath = FileUtil.getPath(weakContext.get(), path);
}
PackageInfo info = pm.getPackageArchiveInfo(realPath, 0);
if (info == null) {
throw new NullPointerException();
}
ApplicationInfo applicationInfo = info.applicationInfo;
applicationInfo.sourceDir = realPath;
applicationInfo.publicSourceDir = realPath;
App app = new App(info, pm);
app.isFormFile = true;
return app;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
}
@Test
public void innerDisposed() {
BehaviorRelay.create()
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
assertFalse(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private static Disposable getRx3Disposable(final Cancelable cancelable) {
return new Disposable() {
@Override public void dispose() {
cancelable.cancel();
}
@Override public boolean isDisposed() {
return cancelable.isCanceled();
}
};
}
@Test
public void callIsCanceledWhenDisposed() throws Exception {
server.enqueue(Utils.INSTANCE.mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID));
TestObserver<Response<EpisodeHeroNameQuery.Data>> testObserver = new TestObserver<>();
Disposable disposable = Rx3Apollo
.from(apolloClient.query(new EpisodeHeroNameQuery(Input.fromNullable(EMPIRE))))
.subscribeWith(testObserver);
disposable.dispose();
testObserver.assertComplete();
assertThat(testObserver.isDisposed()).isTrue();
}
@Test
public void prefetchIsCanceledWhenDisposed() throws Exception {
server.enqueue(Utils.INSTANCE.mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID));
TestObserver<EpisodeHeroNameQuery.Data> testObserver = new TestObserver<>();
Disposable disposable = Rx3Apollo
.from(apolloClient.prefetch(new EpisodeHeroNameQuery(Input.fromNullable(EMPIRE))))
.observeOn(new TestScheduler())
.subscribeWith(testObserver);
disposable.dispose();
testObserver.assertNotComplete();
assertThat(testObserver.isDisposed()).isTrue();
}
@Override public void onSubscribe(Disposable d) {
downstream.onSubscribe(d);
T value = lastSeen.value;
if (value != null && !d.isDisposed()) {
downstream.onNext(value);
}
}
@Test public void refCountToUpstream() {
PublishSubject<String> subject = PublishSubject.create();
final AtomicInteger count = new AtomicInteger();
Observable<String> observable = subject //
.doOnSubscribe(new Consumer<Disposable>() {
@Override public void accept(Disposable disposable) throws Exception {
count.incrementAndGet();
}
}) //
.doOnDispose(new Action() {
@Override public void run() throws Exception {
count.decrementAndGet();
}
}) //
.compose(ReplayingShare.<String>instance());
Disposable disposable1 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
Disposable disposable2 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
Disposable disposable3 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
disposable1.dispose();
assertEquals(1, count.get());
disposable3.dispose();
assertEquals(1, count.get());
disposable2.dispose();
assertEquals(0, count.get());
}
@Test
public void innerDisposed() {
BehaviorRelay.create()
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
assertFalse(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onSubscribe(Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
try (SafeCloseable ignored = assemblyContext.push()) {
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
try (SafeCloseable ignored = assemblyContext.push()) {
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
try (SafeCloseable ignored = assemblyContext.push()) {
actual.onSubscribe(this);
}
}
protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
return asyncResponse;
}
protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
}
return asyncResponse;
}
protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
return asyncResponse;
}
public static Disposable streamLiveVideo(long produceDelay, long consumeDelay, int bufferSize, Runnable onError) {
return Flowable
.fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> {
sleep(produceDelay);
return new VideoFrame(videoFrame.getNumber() + 1);
}))
.subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true)
.onBackpressureBuffer(bufferSize, null, BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
sleep(consumeDelay);
}, throwable -> {
onError.run();
});
}
@Override
public ObservableSource<M> apply(@NonNull Observable<E> upstream) {
return Observable.create(
new ObservableOnSubscribe<M>() {
@Override
public void subscribe(@NonNull ObservableEmitter<M> emitter) throws Throwable {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new com.spotify.mobius.functions.Consumer<M>() {
@Override
public void accept(M value) {
emitter.onNext(value);
}
});
final Disposable eventsDisposable =
upstream.subscribe(
new Consumer<E>() {
@Override
public void accept(E event) throws Throwable {
loop.dispatchEvent(event);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
emitter.onError(new UnrecoverableIncomingException(throwable));
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
loop.dispose();
eventsDisposable.dispose();
}
});
}
});
}