类io.reactivex.rxjava3.core.Observable源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.core.Observable的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mobius   文件: RxMobius.java
/**
 * Add an {@link ObservableTransformer} for handling effects of a given type. The handler will
 * receive all effect objects that extend the given class.
 *
 * <p>Adding handlers for two effect classes where one is a super-class of the other is
 * considered a collision and is not allowed. Registering the same class twice is also
 * considered a collision.
 *
 * @param effectClass the class to handle
 * @param effectHandler the effect handler for the given effect class
 * @param <G> the effect class as a type parameter
 * @return this builder
 * @throws IllegalArgumentException if there is a handler collision
 */
public <G extends F> RxMobius.SubtypeEffectHandlerBuilder<F, E> addTransformer(
    final Class<G> effectClass, final ObservableTransformer<G, E> effectHandler) {
  checkNotNull(effectClass);
  checkNotNull(effectHandler);

  for (Class<?> cls : effectPerformerMap.keySet()) {
    if (cls.isAssignableFrom(effectClass) || effectClass.isAssignableFrom(cls)) {
      throw new IllegalArgumentException(
          "Effect classes may not be assignable to each other, collision found: "
              + effectClass.getSimpleName()
              + " <-> "
              + cls.getSimpleName());
    }
  }

  effectPerformerMap.put(
      effectClass,
      (Observable<F> effects) ->
          effects
              .ofType(effectClass)
              .compose(effectHandler)
              .doOnError(onErrorFunction.apply(effectHandler)));

  return this;
}
 
源代码2 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
 * the stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the consumer on the specified scheduler, and passing it
 * the requested effect object.
 *
 * @param doEffect the {@link Consumer} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the consumer
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromConsumer(
    final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(final F effect) {
                  Completable completable =
                      Completable.fromAction(
                          new Action() {
                            @Override
                            public void run() throws Throwable {
                              doEffect.accept(effect);
                            }
                          });
                  return scheduler == null ? completable : completable.subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码3 项目: RxLifecycle   文件: MainActivity.java
@Override
protected void onStart() {
    super.onStart();

    Log.d(TAG, "onStart()");

    // Using automatic unsubscription, this should determine that the correct time to
    // unsubscribe is onStop (the opposite of onStart).
    Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "Unsubscribing subscription from onStart()");
            }
        })
        .compose(this.<Long>bindToLifecycle())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long num) throws Exception {
                Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
            }
        });
}
 
源代码4 项目: mobius   文件: TransformersTest.java
@Test
public void processingLongEffectsDoesNotBlockProcessingShorterEffects() {
  final List<String> effects = Arrays.asList("Hello", "Rx");

  PublishSubject<String> upstream = PublishSubject.create();
  Function<String, Integer> sleepyFunction =
      s -> {
        try {
          Thread.sleep(duration(s));
        } catch (InterruptedException ie) {
        }
        return s.length();
      };

  final List<Integer> results = new ArrayList<>();
  upstream
      .compose(Transformers.fromFunction(sleepyFunction, Schedulers.io()))
      .subscribe(results::add);

  Observable.fromIterable(effects).subscribe(upstream);

  await().atMost(durationForEffects(effects)).until(() -> results.equals(expected(effects)));
}
 
源代码5 项目: mobius   文件: RxMobiusLoopTest.java
@Test
public void shouldThrowIfStartingALoopWithInitAndStartEffects() throws Exception {
  MobiusLoop.Builder<String, Integer, Boolean> withInit =
      builder.init(
          new Init<String, Boolean>() {
            @Nonnull
            @Override
            public First<String, Boolean> init(String model) {
              return First.first(model + "-init");
            }
          });

  ObservableTransformer<Integer, String> transformer =
      RxMobius.loopFrom(withInit, "hi", effects(true));

  Observable.just(10)
      .compose(transformer)
      .test()
      .assertError(t -> t.getMessage().contains("cannot pass in start effects"));
}
 
源代码6 项目: RxCache   文件: TestDiskImplWithMoshi.java
private static void testObject(RxCache rxCache) {

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test", u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码7 项目: cxf   文件: JAXRSRxJava3ObservableTest.java
@Test
public void testGetHelloWorldJson() throws Exception {
    String address = "http://localhost:" + PORT + "/rx3/observable/textJson";
    List<Object> providers = new LinkedList<>();
    providers.add(new JacksonJsonProvider());
    providers.add(new ObservableRxInvokerProvider());
    WebClient wc = WebClient.create(address, providers);
    Observable<HelloWorldBean> obs = wc.accept("application/json")
        .rx(ObservableRxInvoker.class)
        .get(HelloWorldBean.class);

    Holder<HelloWorldBean> holder = new Holder<>();
    Disposable d = obs.subscribe(v -> {
        holder.value = v;
    });
    if (d == null) {
        throw new IllegalStateException("Subscribe did not return a Disposable");
    }
    Thread.sleep(2000);
    assertEquals("Hello", holder.value.getGreeting());
    assertEquals("World", holder.value.getAudience());
}
 
源代码8 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest3WithError(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, true);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);
  final List<String> completeList = new ArrayList<>();
  final List<String> errorList = new ArrayList<>();
  observable.subscribe(onNext, onError(errorList), onComplete(completeList, tracer));

  assertEquals(0, result.size());
  assertEquals(0, completeList.size());
  assertEquals(1, errorList.size());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
源代码9 项目: RxCache   文件: TestWithSpring.java
public static void main(String[] args) {

        ApplicationContext ctx = new AnnotationConfigApplicationContext(Config.class);

        RxCache rxCache = (RxCache) ctx.getBean("rxCache");

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码10 项目: catnip   文件: RestChannel.java
@Nonnull
@CheckReturnValue
public Observable<JsonArray> getReactionsRaw(@Nonnull final String channelId, @Nonnull final String messageId,
                                             @Nonnull final String emoji, @Nullable final String before,
                                             @Nullable final String after, @Nonnegative final int limit) {
    
    final QueryStringBuilder builder = new QueryStringBuilder();
    if(limit > 0) {
        builder.append("limit", Integer.toString(limit));
    }
    if(before != null) {
        builder.append("before", before);
    }
    
    if(after != null) {
        builder.append("after", after);
    }
    
    final String query = builder.build();
    return catnip().requester()
            .queue(new OutboundRequest(Routes.GET_REACTIONS.withMajorParam(channelId).withQueryString(query),
                    Map.of("message", messageId, "emojis", encodeUTF8(emoji))))
            .map(ResponsePayload::array);
}
 
源代码11 项目: RxCache   文件: CacheAndRemoteStrategy.java
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

    Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);

    Observable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return Observable.concatDelayError(Arrays.asList(cache, remote))
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(@NonNull Record<T> record) throws Exception {
                    return record.getData() != null;
                }
            });
}
 
源代码12 项目: Image-Cipher   文件: PixelTraversalController.java
void setClickObservable(Observable<Point> observable) {
  logger.info("Setting observable");
  disposable = observable.subscribeOn(Schedulers.computation())
      .subscribe(point -> {
        startingPoint = point;
        runPixelTraversal();
      });
}
 
源代码13 项目: catnip   文件: RestWebhook.java
@Nonnull
@CheckReturnValue
public Observable<Webhook> getGuildWebhooks(@Nonnull final String guildId) {
    return getGuildWebhooksRaw(guildId)
            .map(e -> mapObjectContents(entityBuilder()::createWebhook).apply(e))
            .flatMapIterable(e -> e);
}
 
@Test
public void doNotTimeoutEmpty() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ofMinutes(1)));

    TestObserver<?> observer = Observable.empty()
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    observer.assertComplete();
    then(timeLimiter).should()
        .onSuccess();
}
 
源代码15 项目: claudb   文件: ClauDB.java
@Override
public void exportRDB(OutputStream output) {
  executeOn(Observable.create(observable -> {
    getState().exportRDB(output);
    observable.onComplete();
  })).blockingSubscribe();
}
 
源代码16 项目: RxBasicsKata   文件: CountriesServiceSolvedTest.java
@Test
public void rx_sumPopulationOfCountries() {
    // hint: use "map" operator
    TestObserver<Long> testObserver = countriesService
            .sumPopulationOfCountries(Observable.fromIterable(allCountries), Observable.fromIterable(allCountries))
            .test();
    testObserver.assertResult(CountriesTestProvider.sumPopulationOfAllCountries()
            + CountriesTestProvider.sumPopulationOfAllCountries());
    testObserver.assertNoErrors();
}
 
源代码17 项目: catnip   文件: RestEmoji.java
@Nonnull
public Observable<JsonArray> listGuildEmojisRaw(@Nonnull final String guildId) {
    return catnip().requester().queue(
            new OutboundRequest(
                    Routes.LIST_GUILD_EMOJIS.withMajorParam(guildId),
                    Map.of()))
            .map(ResponsePayload::array);
}
 
/**
 * Returns {@code true} if the specified {@link Class} can be handled by the
 * {@link ObservableResponseConverterFunction}.
 */
private static boolean isSupportedClass(Class<?> clazz) {
    return Observable.class.isAssignableFrom(clazz) ||
           Maybe.class.isAssignableFrom(clazz) ||
           Single.class.isAssignableFrom(clazz) ||
           Completable.class.isAssignableFrom(clazz);
}
 
源代码19 项目: catnip   文件: RestVoice.java
@Nonnull
@CheckReturnValue
public Observable<JsonArray> listVoiceRegionsRaw() {
    return catnip().requester().queue(new OutboundRequest(Routes.LIST_VOICE_REGIONS,
            Map.of()))
            .map(ResponsePayload::array);
}
 
源代码20 项目: resilience4j   文件: ObserverCircuitBreakerTest.java
@Test
public void shouldSubscribeToObservableJust() {
    given(circuitBreaker.tryAcquirePermission()).willReturn(true);

    Observable.just("Event 1", "Event 2")
        .compose(CircuitBreakerOperator.of(circuitBreaker))
        .test()
        .assertResult("Event 1", "Event 2");

    then(circuitBreaker).should().onSuccess(anyLong(), any(TimeUnit.class));
    then(circuitBreaker).should(never())
        .onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
}
 
源代码21 项目: RxCache   文件: TestDiskImplWithMoshi.java
private static void testSet(RxCache rxCache) {

        Set<User> set = new HashSet<>();

        User u1 = new User();
        u1.name = "tonySet1";
        u1.password = "set1123456";
        set.add(u1);

        User u2 = new User();
        u2.name = "tonySet12";
        u2.password = "set12345";
        set.add(u2);
        rxCache.save("set", set);

        Type type = TypeBuilder
                .newInstance(Set.class)
                .addTypeParam(User.class)
                .build();

        Observable<Record<Set<User>>> observable = rxCache.load2Observable("set", type);

        observable.subscribe(new Consumer<Record<Set<User>>>() {

            @Override
            public void accept(Record<Set<User>> record) throws Exception {

                Set<User> recordDataList = record.getData();

                if (Preconditions.isNotBlank(recordDataList)) {
                    for (User user : recordDataList) {
                        System.out.println(user.name);
                        System.out.println(user.password);
                    }
                }
            }
        });
    }
 
源代码22 项目: catnip   文件: RestGuild.java
@Nonnull
@CheckReturnValue
public Observable<JsonObject> modifyGuildEmbedRaw(@Nonnull final String guildId, @Nullable final String channelId,
                                                  final boolean enabled, @Nullable final String reason) {
    return catnip().requester()
            .queue(new OutboundRequest(Routes.MODIFY_GUILD_EMBED.withMajorParam(guildId), Map.of(),
                    JsonObject.builder().value("channel_id", channelId).value("enabled", enabled).done(), reason))
            .map(ResponsePayload::object);
}
 
源代码23 项目: resilience4j   文件: ObserverRateLimiterTest.java
@Test
public void shouldDelaySubscription() {
    given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(1).toNanos());

    Observable.just(1)
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .awaitDone(2, TimeUnit.SECONDS);
}
 
源代码24 项目: catnip   文件: BasePaginator.java
/**
 * Fetches up to {@link #limit(int) limit} entities, returning a list
 * containing all of them.
 * <br><b>This method will keep all entities in memory</b>, so for unbounded
 * pagination it should be avoided.
 *
 * @return A list containing all the fetched entities.
 */
@Nonnull
@CheckReturnValue
public Observable<T> fetch() {
    final List<T> list = new ArrayList<>();
    return forEach(list::add)
            .map(__ -> Collections.unmodifiableList(list))
            .flatMapIterable(e -> e);
}
 
源代码25 项目: catnip   文件: RestGuild.java
@Nonnull
@CheckReturnValue
public Observable<CreatedInvite> getGuildInvites(@Nonnull final String guildId) {
    return getGuildInvitesRaw(guildId)
            .map(e -> mapObjectContents(entityBuilder()::createCreatedInvite).apply(e))
            .flatMapIterable(e -> e);
}
 
源代码26 项目: catnip   文件: RestGuild.java
public AuditLogPaginator getGuildAuditLog(@Nonnull final String guildId) {
    return new AuditLogPaginator(entityBuilder()) {
        @Nonnull
        @CheckReturnValue
        @Override
        protected Observable<JsonObject> fetchNext(@Nonnull final RequestState<AuditLogEntry> state, @Nullable final String lastId,
                                                   @Nonnegative final int requestSize) {
            return getGuildAuditLogRaw(guildId, state.extra("user"), lastId, state.extra("type"), requestSize);
        }
    };
}
 
源代码27 项目: claudb   文件: ClauDB.java
@Override
public void importRDB(InputStream input) {
  executeOn(Observable.create(observable -> {
    getState().importRDB(input);
    observable.onComplete();
  })).blockingSubscribe();
}
 
源代码28 项目: catnip   文件: RestGuild.java
@Nonnull
@CheckReturnValue
public Observable<JsonObject> getGuildPruneCountRaw(@Nonnull final String guildId, @Nonnegative final int days) {
    final String query = new QueryStringBuilder().append("days", Integer.toString(days)).build();
    return catnip().requester().queue(new OutboundRequest(Routes.GET_GUILD_PRUNE_COUNT
            .withMajorParam(guildId).withQueryString(query), Map.of()))
            .map(ResponsePayload::object);
}
 
源代码29 项目: RxRelay   文件: BehaviorRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        src.accept(v);
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        inOrder.verify(o).onNext(v + ", " + v);
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
 
源代码30 项目: catnip   文件: RestGuild.java
@Nonnull
@CheckReturnValue
public Observable<JsonObject> createGuildRoleRaw(@Nonnull final String guildId, @Nonnull final RoleData roleData,
                                                 @Nullable final String reason) {
    return catnip().requester()
            .queue(new OutboundRequest(Routes.CREATE_GUILD_ROLE.withMajorParam(guildId),
                    Map.of(), roleData.toJson(), reason))
            .map(ResponsePayload::object);
}
 
 类所在包
 同包方法