io.reactivex.Observable#fromIterable ( )源码实例Demo

下面列出了io.reactivex.Observable#fromIterable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: rxjava2   文件: ParallelStreams.java
public static void main(String[] args) {

        List<Beer> beers = loadCellar();  // populate the beer collection

        Observable<Beer> observableBeers = Observable.fromIterable(beers);

        observableBeers
                .flatMap(beer -> Observable.just(beer)
                           .subscribeOn(Schedulers.computation())  // replace computation() with newThread()
                           .map(beeer -> matureBeer(beeer))
                 )
                .subscribe(beer -> System.out.println("Subscriber got " +
                               beer.name + " on  " +
                               Thread.currentThread().getName())
                );


        // Just to keep the program running
        try {
            Thread.sleep(5000);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
private Observable<Pair<Contributor, Long>> _getCachedData() {

    List<Pair<Contributor, Long>> list = new ArrayList<>();

    Pair<Contributor, Long> dataWithAgePair;

    for (String username : _contributionMap.keySet()) {
      Contributor c = new Contributor();
      c.login = username;
      c.contributions = _contributionMap.get(username);

      dataWithAgePair = new Pair<>(c, System.currentTimeMillis());
      list.add(dataWithAgePair);
    }

    return Observable.fromIterable(list);
  }
 
源代码3 项目: Spring-5.0-Projects   文件: RxJavaIterableDemo.java
public static void main(String[] args) {

		List<EmployeeRating> employeeList = new ArrayList<EmployeeRating>();

		EmployeeRating employeeRating1 = new EmployeeRating();
		employeeRating1.setName("Lilly");
		employeeRating1.setRating(6);
		employeeList.add(employeeRating1);

		employeeRating1 = new EmployeeRating();
		employeeRating1.setName("Peter");
		employeeRating1.setRating(5);
		employeeList.add(employeeRating1);

		employeeRating1 = new EmployeeRating();
		employeeRating1.setName("Bhakti");
		employeeRating1.setRating(9);
		employeeList.add(employeeRating1);

		employeeRating1 = new EmployeeRating();
		employeeRating1.setName("Harmi");
		employeeRating1.setRating(9);
		employeeList.add(employeeRating1);


		Observable<EmployeeRating> employeeRatingSource = Observable.fromIterable(employeeList);

		employeeRatingSource.filter(employeeRating -> employeeRating.getRating() >=7)
			.subscribe(empRating -> System.out.println("Star Employee: " + empRating.getName() 
				+ " Rating : "+empRating.getRating()));

	}
 
源代码4 项目: tutorials   文件: FlowableIntegrationTest.java
@Test public void thenAllValuesAreBufferedAndReceived() {
    List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).test();

    testSubscriber.awaitTerminalEvent();

    List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());

    assertEquals(testList, receivedInts);
}
 
源代码5 项目: tutorials   文件: FlowableIntegrationTest.java
@Test public void whenLatestStrategyUsed_thenTheLastElementReceived() {
    List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).test();

    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(receivedInts.contains(100000));
}
 
源代码6 项目: rxjava2   文件: ObservableErrorComplete.java
public static void main(String[] args) {

        List<Beer> beers = loadCellar();  // populate the beer collection

        System.out.println("== Observable creation from an Iterable");

        Observable<Beer> observableBeer = Observable.fromIterable(beers);

        observableBeer.subscribe(
                beer -> System.out.println(beer),
                error -> System.err.println(error),
                () -> System.out.println("Streaming is over")
        );
    }
 
源代码7 项目: rxjava2   文件: HelloObservable.java
public static void main(String[] args) {

        List<Beer> beers = loadCellar();  // populate the beer collection

        Observable<Beer> observableBeer =
                Observable.fromIterable(beers);   // Create Observable from a List

        observableBeer.subscribe(
                beer -> System.out.println(beer)    // onNext handler
        );
    }
 
源代码8 项目: rxjava2   文件: StreamVsObservable.java
public static void main(String[] args) {

        List<Beer> beers = loadCellar();  // populate the beer collection

        // === Java 8 Stream
        System.out.println("\n== Iterating over Java 8 Stream");

        beers.stream()
                .skip(1)
                .limit(3)
                .filter(b -> "USA".equals(b.country))
                .map(b -> b.name + ": $" + b.price)
                .forEach(beer -> System.out.println(beer));

        // === RxJava Observable

        Observable<Beer> observableBeer = null;

        System.out.println("\n== Subscribing to Observable ");

        observableBeer = Observable.fromIterable(beers);

        observableBeer
                .skip(1)
                .take(3)
                .filter(b -> "USA".equals(b.country))
                .map(b -> b.name + ": $" + b.price)
                .subscribe(
                        beer -> System.out.println(beer),
                        err ->  System.out.println(err),
                        () ->   System.out.println("Streaming is complete"),
                        disposable -> System.out.println( " !!! Someone just subscribed to the beer stream!!! ")
        );
    }
 
@Test
public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() { 
    List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
    Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
    for (int i = 1; i < 4; i++) {
        source.test()
            .awaitDone(5, TimeUnit.SECONDS)
            .assertResult(1, 2, 3);
    }

    exec.shutdown();
}
 
源代码10 项目: adamant-android   文件: ObservableRxList.java
public Observable<T> getCurrentList() {
    return Observable.fromIterable(list);
}
 
源代码11 项目: reactive-code-workshop   文件: RxApplication.java
private Observable<String> lines() {
    return Observable.fromIterable(() -> Jabberwocky.lines().iterator());
}
 
源代码12 项目: symbol-sdk-java   文件: ListenerBase.java
public Observable<Boolean> transactionFromAddress(final Transaction transaction, final Address address,
    final Observable<List<NamespaceId>> namespaceIdsObservable) {
    if (transaction.getSigner().filter(s -> s.getAddress().equals(address)).isPresent()) {
        return Observable.just(true);
    }
    if (transaction instanceof AggregateTransaction) {
        final AggregateTransaction aggregateTransaction = (AggregateTransaction) transaction;
        if (aggregateTransaction.getCosignatures().stream()
            .anyMatch(c -> c.getSigner().getAddress().equals(address))) {
            return Observable.just(true);
        }
        //Recursion...
        Observable<Transaction> innerTransactionObservable = Observable
            .fromIterable(aggregateTransaction.getInnerTransactions());

        return innerTransactionObservable
            .flatMap(t -> this.transactionFromAddress(t, address, namespaceIdsObservable).filter(a -> a))
            .first(false).toObservable();
    }
    if (transaction instanceof PublicKeyLinkTransaction) {
        return Observable.just(Address
            .createFromPublicKey(((PublicKeyLinkTransaction) transaction).getLinkedPublicKey().toHex(),
                transaction.getNetworkType()).equals(address));
    }

    if (transaction instanceof MetadataTransaction) {
        MetadataTransaction metadataTransaction = (MetadataTransaction) transaction;
        return Observable.just(metadataTransaction.getTargetAddress().equals(address));
    }

    if (transaction instanceof TargetAddressTransaction) {
        TargetAddressTransaction targetAddressTransaction = (TargetAddressTransaction) transaction;
        if (targetAddressTransaction.getTargetAddress() instanceof Address) {
            return Observable.just(targetAddressTransaction.getTargetAddress().equals(address));
        }
        return namespaceIdsObservable
            .map(namespaceIds -> namespaceIds.contains(targetAddressTransaction.getTargetAddress()));
    }

    if (transaction instanceof MultisigAccountModificationTransaction) {
        MultisigAccountModificationTransaction multisigAccountModificationTransaction = (MultisigAccountModificationTransaction) transaction;
        if (multisigAccountModificationTransaction.getAddressAdditions().stream()
            .anyMatch(a -> a.equals(address))) {
            return Observable.just(true);
        }

        return Observable.just(
            multisigAccountModificationTransaction.getAddressDeletions().stream().anyMatch(a -> a.equals(address)));

    }

    if (transaction instanceof AccountAddressRestrictionTransaction) {
        AccountAddressRestrictionTransaction accountAddressRestrictionTransaction = (AccountAddressRestrictionTransaction) transaction;
        if (accountAddressRestrictionTransaction.getRestrictionAdditions().contains(address)) {
            return Observable.just(true);
        }
        if (accountAddressRestrictionTransaction.getRestrictionDeletions().contains(address)) {
            return Observable.just(true);
        }
        return namespaceIdsObservable.flatMap(namespaceIds -> {
            if (namespaceIds.stream().anyMatch(
                namespaceId -> accountAddressRestrictionTransaction.getRestrictionAdditions()
                    .contains(namespaceId))) {
                return Observable.just(true);
            }
            if (namespaceIds.stream().anyMatch(
                namespaceId -> accountAddressRestrictionTransaction.getRestrictionDeletions()
                    .contains(namespaceId))) {
                return Observable.just(true);
            }
            return Observable.just(false);
        });
    }

    if (transaction instanceof RecipientTransaction) {
        RecipientTransaction recipientTransaction = (RecipientTransaction) transaction;
        if (recipientTransaction.getRecipient() instanceof NamespaceId) {
            return namespaceIdsObservable
                .map(namespaceIds -> namespaceIds.contains(recipientTransaction.getRecipient()));
        }
        return Observable.just(recipientTransaction.getRecipient().equals(address));

    }

    return Observable.just(false);
}
 
@Override
public Observable<Department> getDeptsRx() {
	Observable<Department> depts= Observable.fromIterable(departmentDaoImpl.getDepartments());
	return depts;
}
 
@Override
public Observable<Department> getDeptsRx() {
	Observable<Department> depts= Observable.fromIterable(departmentDaoImpl.getDepartments());
	return depts;
}
 
@Override
public Observable<Department> getDeptsRx() {
	Observable<Department> depts= Observable.fromIterable(departmentDaoImpl.getDepartments());
	return depts;
}
 
Observable<Post> findAll() {
    return Observable.fromIterable(DATA);
}
 
源代码17 项目: Android-ReactiveProgramming   文件: DataManager.java
public Observable<Integer> numbers() {
    return Observable.fromIterable(numberGenerator.numbers());
}
 
源代码18 项目: Android-ReactiveProgramming   文件: DataManager.java
public Observable<Long> numbers(int upUntil) {
    return Observable.fromIterable(numberGenerator.numbers(upUntil));
}
 
源代码19 项目: Android-ReactiveProgramming   文件: DataManager.java
public Observable<String> elements() {
    return Observable.fromIterable(stringGenerator.randomStringList());
}
 
源代码20 项目: tutorials   文件: RxHelloWorld.java
/**
 * @return an {@link Observable} that emits events "hello" and "world" before completing.
 */
public static Observable<String> hello() {
    // Guava ImmutableList class is an implementation detail.
    List<String> values = ImmutableList.of("hello", "world");
    return Observable.fromIterable(values);
}