io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.disposables.Disposable源码实例Demo

下面列出了io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.disposables.Disposable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: mobius   文件: RxEventSources.java
/**
 * Create an observable from the given event source.
 *
 * @param eventSource the eventSource you want to convert to an observable
 * @param <E> the event type
 * @return an Observable based on the provided event source
 */
@NonNull
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
  return Observable.create(
      new ObservableOnSubscribe<E>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<E> emitter) throws Throwable {
          final com.spotify.mobius.disposables.Disposable disposable =
              eventSource.subscribe(
                  new com.spotify.mobius.functions.Consumer<E>() {
                    @Override
                    public void accept(E value) {
                      emitter.onNext(value);
                    }
                  });
          emitter.setCancellable(
              new Cancellable() {
                @Override
                public void cancel() throws Throwable {
                  disposable.dispose();
                }
              });
        }
      });
}
 
源代码2 项目: extentreports-java   文件: JsonFormatter.java
@Override
public Observer<ReportEntity> getReportObserver() {
    return new Observer<ReportEntity>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(ReportEntity value) {
            flush(value);
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}
 
源代码3 项目: extentreports-java   文件: ExtentSparkReporter.java
@Override
public Observer<ReportEntity> getReportObserver() {
    return new Observer<ReportEntity>() {
        @Override
        public void onSubscribe(Disposable d) {
            start(d);
        }

        @Override
        public void onNext(ReportEntity value) {
            flush(value);
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}
 
@Override
public Observer<ReportEntity> getReportObserver() {
    return new Observer<ReportEntity>() {
        @Override
        public void onSubscribe(Disposable d) {
            disp = d;
        }

        @Override
        public void onNext(ReportEntity value) {
            entity = value;
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}
 
源代码5 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);

  final Disposable disposable = observable.subscribe(onNext);
  logger.fine(String.valueOf(disposable));

  assertEquals(5, result.size());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
源代码6 项目: java-specialagent   文件: RxJava3Test.java
private static <T>Observer<T> observer(final String name, final List<T> result) {
  return new Observer<T>() {
    @Override
    public void onSubscribe(final Disposable d) {
    }

    @Override
    public void onNext(final T next) {
      logger.fine(name + ": " + next);
      result.add(next);
    }

    @Override
    public void onError(final Throwable e) {
      e.printStackTrace();
    }

    @Override
    public void onComplete() {
      logger.fine(name + ": onComplete");
    }
  };
}
 
源代码7 项目: GetApk   文件: LocalRepository.java
public Disposable getAndSort(Context context, final boolean sortByTime, final DateFormat dateFormat, KWSubscriber<ItemArray> subscriber) {
    return Flowable.just(new WeakReference<>(context))
            .flatMap(new Function<WeakReference<Context>, Publisher<List<App>>>() {
                @Override
                public Publisher<List<App>> apply(WeakReference<Context> weakContext) throws Exception {
                    List<App> apps = getApps();
                    if (apps == null) {
                        return getApps(weakContext.get(), dateFormat);
                    } else {
                        return Flowable.just(apps);
                    }
                }
            })
            .map(new SortFunction(dateFormat, sortByTime))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码8 项目: GetApk   文件: LocalRepository.java
@Deprecated
public Disposable saveApk(App app, final String dest, final KWSubscriber<String> subscriber) {
    return Flowable.just(app)
            .map(new Function<App, String>() {
                @Override
                public String apply(App source) throws Exception {
                    String fileName = source.name + "_" + source.versionName + ".apk";
                    return FileUtil.copy(source.apkPath, dest, fileName, new OnCopyListener() {
                        @Override
                        public void inProgress(final float progress) {
                            mHandler.post(new Runnable() {
                                @Override
                                public void run() {
                                    subscriber.inProgress(progress);
                                }
                            });
                        }
                    }).getAbsolutePath();
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码9 项目: GetApk   文件: LocalRepository.java
public Disposable saveApk(ContentResolver resolver, App app, final Uri dest, final KWSubscriber<Uri> subscriber) {
    return Flowable.just(app)
            .map(new Function<App, Uri>() {
                @Override
                public Uri apply(App source) throws Exception {
                    FileUtil.copy(resolver, Uri.fromFile(new File(source.apkPath)), dest, new OnCopyListener() {
                        @Override
                        public void inProgress(final float progress) {
                            mHandler.post(new Runnable() {
                                @Override
                                public void run() {
                                    subscriber.inProgress(progress);
                                }
                            });
                        }
                    });
                    return dest;
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码10 项目: cxf   文件: JAXRSRxJava3ObservableTest.java
@Test
public void testGetHelloWorldJson() throws Exception {
    String address = "http://localhost:" + PORT + "/rx3/observable/textJson";
    List<Object> providers = new LinkedList<>();
    providers.add(new JacksonJsonProvider());
    providers.add(new ObservableRxInvokerProvider());
    WebClient wc = WebClient.create(address, providers);
    Observable<HelloWorldBean> obs = wc.accept("application/json")
        .rx(ObservableRxInvoker.class)
        .get(HelloWorldBean.class);

    Holder<HelloWorldBean> holder = new Holder<>();
    Disposable d = obs.subscribe(v -> {
        holder.value = v;
    });
    if (d == null) {
        throw new IllegalStateException("Subscribe did not return a Disposable");
    }
    Thread.sleep(2000);
    assertEquals("Hello", holder.value.getGreeting());
    assertEquals("World", holder.value.getAudience());
}
 
源代码11 项目: mobius   文件: RxEventSources.java
/**
 * Create an event source from the given RxJava streams.
 *
 * <p>All streams must be mapped to your event type.
 *
 * @param sources the observables you want to include in this event source
 * @param <E> the event type
 * @return an EventSource based on the provided observables
 */
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
  final Observable<E> eventSource = Observable.mergeArray(sources);
  return new EventSource<E>() {

    @Nonnull
    @Override
    public com.spotify.mobius.disposables.Disposable subscribe(
        com.spotify.mobius.functions.Consumer<E> eventConsumer) {
      final Disposable disposable =
          eventSource.subscribe(
              new Consumer<E>() {
                @Override
                public void accept(E value) throws Throwable {
                  eventConsumer.accept(value);
                }
              },
              new Consumer<Throwable>() {
                @Override
                public void accept(Throwable error) throws Throwable {
                  RxJavaPlugins.onError(error);
                }
              });
      return new com.spotify.mobius.disposables.Disposable() {
        @Override
        public void dispose() {
          disposable.dispose();
        }
      };
    }
  };
}
 
源代码12 项目: java-specialagent   文件: RxJava3AgentIntercept.java
public static Object disposable() {
  return new Disposable() {
    @Override
    public void dispose() {
    }

    @Override
    public boolean isDisposed() {
      return true;
    }
  };
}
 
源代码13 项目: GetApk   文件: BaseDialog.java
/**
 * 插入到观察者集合
 *
 * @param disposable
 */
public void addDisposable(Disposable disposable) {
    if (disposable != null) {
        if (mCompositeDisposable != null) {
            mCompositeDisposable.add(disposable);
        } else {
            disposable.dispose();
        }
    }
}
 
源代码14 项目: GetApk   文件: BaseDialog.java
public void removeDisposable(Disposable disposable) {
    if (disposable != null) {
        if (mCompositeDisposable != null) {
            mCompositeDisposable.remove(disposable);
        } else if (!disposable.isDisposed()) {
            disposable.dispose();
        }
    }
}
 
源代码15 项目: GetApk   文件: LocalRepository.java
public Disposable getApp(Context context, final Uri path, KWSubscriber<App> subscriber) {
    return Flowable.just(new WeakReference<>(context))
            .map(weakContext -> {
                PackageManager pm = weakContext.get().getPackageManager();
                String realPath;
                if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q){
                    File file = new File(weakContext.get().getExternalCacheDir(), "temp.apk");
                    file.deleteOnExit();
                    realPath = file.getAbsolutePath();
                    FileUtil.copy(weakContext.get().getContentResolver(), path, Uri.fromFile(new File(realPath)), null);
                }else {
                    realPath = FileUtil.getPath(weakContext.get(), path);
                }
                PackageInfo info = pm.getPackageArchiveInfo(realPath, 0);
                if (info == null) {
                    throw new NullPointerException();
                }
                ApplicationInfo applicationInfo = info.applicationInfo;
                applicationInfo.sourceDir = realPath;
                applicationInfo.publicSourceDir = realPath;
                App app = new App(info, pm);
                app.isFormFile = true;
                return app;
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码16 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void innerDisposed() {
    BehaviorRelay.create()
    .subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
            assertFalse(d.isDisposed());

            d.dispose();

            assertTrue(d.isDisposed());
        }

        @Override
        public void onNext(Object value) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}
 
源代码17 项目: apollo-android   文件: Rx3Apollo.java
private static Disposable getRx3Disposable(final Cancelable cancelable) {
  return new Disposable() {
    @Override public void dispose() {
      cancelable.cancel();
    }

    @Override public boolean isDisposed() {
      return cancelable.isCanceled();
    }
  };
}
 
源代码18 项目: apollo-android   文件: Rx3ApolloTest.java
@Test
public void callIsCanceledWhenDisposed() throws Exception {
  server.enqueue(Utils.INSTANCE.mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID));

  TestObserver<Response<EpisodeHeroNameQuery.Data>> testObserver = new TestObserver<>();
  Disposable disposable = Rx3Apollo
      .from(apolloClient.query(new EpisodeHeroNameQuery(Input.fromNullable(EMPIRE))))
      .subscribeWith(testObserver);

  disposable.dispose();

  testObserver.assertComplete();
  assertThat(testObserver.isDisposed()).isTrue();
}
 
源代码19 项目: apollo-android   文件: Rx3ApolloTest.java
@Test
public void prefetchIsCanceledWhenDisposed() throws Exception {
  server.enqueue(Utils.INSTANCE.mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID));

  TestObserver<EpisodeHeroNameQuery.Data> testObserver = new TestObserver<>();
  Disposable disposable = Rx3Apollo
      .from(apolloClient.prefetch(new EpisodeHeroNameQuery(Input.fromNullable(EMPIRE))))
      .observeOn(new TestScheduler())
      .subscribeWith(testObserver);

  disposable.dispose();

  testObserver.assertNotComplete();
  assertThat(testObserver.isDisposed()).isTrue();
}
 
源代码20 项目: RxReplayingShare   文件: ReplayingShare.java
@Override public void onSubscribe(Disposable d) {
  downstream.onSubscribe(d);

  T value = lastSeen.value;
  if (value != null && !d.isDisposed()) {
    downstream.onNext(value);
  }
}
 
@Test public void refCountToUpstream() {
  PublishSubject<String> subject = PublishSubject.create();

  final AtomicInteger count = new AtomicInteger();
  Observable<String> observable = subject //
      .doOnSubscribe(new Consumer<Disposable>() {
        @Override public void accept(Disposable disposable) throws Exception {
          count.incrementAndGet();
        }
      }) //
      .doOnDispose(new Action() {
        @Override public void run() throws Exception {
          count.decrementAndGet();
        }
      }) //
      .compose(ReplayingShare.<String>instance());

  Disposable disposable1 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  Disposable disposable2 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  Disposable disposable3 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  disposable1.dispose();
  assertEquals(1, count.get());

  disposable3.dispose();
  assertEquals(1, count.get());

  disposable2.dispose();
  assertEquals(0, count.get());
}
 
源代码22 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void innerDisposed() {
    BehaviorRelay.create()
    .subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
            assertFalse(d.isDisposed());

            d.dispose();

            assertTrue(d.isDisposed());
        }

        @Override
        public void onNext(Object value) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}
 
源代码23 项目: armeria   文件: RequestContextSingleObserver.java
@Override
public void onSubscribe(Disposable d) {
    if (!DisposableHelper.validate(disposable, d)) {
        return;
    }
    disposable = d;
    try (SafeCloseable ignored = assemblyContext.push()) {
        actual.onSubscribe(this);
    }
}
 
源代码24 项目: armeria   文件: RequestContextMaybeObserver.java
@Override
public void onSubscribe(Disposable d) {
    if (!DisposableHelper.validate(disposable, d)) {
        return;
    }
    disposable = d;
    try (SafeCloseable ignored = assemblyContext.push()) {
        actual.onSubscribe(this);
    }
}
 
@Override
public void onSubscribe(Disposable d) {
    if (!DisposableHelper.validate(disposable, d)) {
        return;
    }
    disposable = d;
    try (SafeCloseable ignored = assemblyContext.push()) {
        actual.onSubscribe(this);
    }
}
 
源代码26 项目: cxf   文件: ReactiveIOInvoker.java
protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
    final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
    Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
    if (d == null) {
        throw new IllegalStateException("Subscribe did not return a Disposable");
    }
    return asyncResponse;
}
 
源代码27 项目: cxf   文件: ReactiveIOInvoker.java
protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
    final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
    if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
        Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
        if (d == null) {
            throw new IllegalStateException("Subscribe did not return a Disposable");
        }
    }
    return asyncResponse;
}
 
源代码28 项目: cxf   文件: ReactiveIOInvoker.java
protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
    final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
    Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
    if (d == null) {
        throw new IllegalStateException("Subscribe did not return a Disposable");
    }
    return asyncResponse;
}
 
源代码29 项目: tutorials   文件: RxJavaLiveVideo.java
public static Disposable streamLiveVideo(long produceDelay, long consumeDelay, int bufferSize, Runnable onError) {
    return Flowable
      .fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> {
          sleep(produceDelay);
          return new VideoFrame(videoFrame.getNumber() + 1);
      }))
      .subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true)
      .onBackpressureBuffer(bufferSize, null, BackpressureOverflowStrategy.ERROR)
      .observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .subscribe(item -> {
          sleep(consumeDelay);
      }, throwable -> {
          onError.run();
      });
}
 
源代码30 项目: mobius   文件: RxMobiusLoop.java
@Override
public ObservableSource<M> apply(@NonNull Observable<E> upstream) {
  return Observable.create(
      new ObservableOnSubscribe<M>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<M> emitter) throws Throwable {
          final MobiusLoop<M, E, ?> loop;

          if (startEffects == null) {
            loop = loopFactory.startFrom(startModel);
          } else {
            loop = loopFactory.startFrom(startModel, startEffects);
          }

          loop.observe(
              new com.spotify.mobius.functions.Consumer<M>() {
                @Override
                public void accept(M value) {
                  emitter.onNext(value);
                }
              });
          final Disposable eventsDisposable =
              upstream.subscribe(
                  new Consumer<E>() {
                    @Override
                    public void accept(E event) throws Throwable {
                      loop.dispatchEvent(event);
                    }
                  },
                  new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Throwable {
                      emitter.onError(new UnrecoverableIncomingException(throwable));
                    }
                  });
          emitter.setCancellable(
              new Cancellable() {
                @Override
                public void cancel() throws Throwable {
                  loop.dispose();
                  eventsDisposable.dispose();
                }
              });
        }
      });
}