下面列出了io.reactivex.Observable#fromIterable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) {
List<Beer> beers = loadCellar(); // populate the beer collection
Observable<Beer> observableBeers = Observable.fromIterable(beers);
observableBeers
.flatMap(beer -> Observable.just(beer)
.subscribeOn(Schedulers.computation()) // replace computation() with newThread()
.map(beeer -> matureBeer(beeer))
)
.subscribe(beer -> System.out.println("Subscriber got " +
beer.name + " on " +
Thread.currentThread().getName())
);
// Just to keep the program running
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private Observable<Pair<Contributor, Long>> _getCachedData() {
List<Pair<Contributor, Long>> list = new ArrayList<>();
Pair<Contributor, Long> dataWithAgePair;
for (String username : _contributionMap.keySet()) {
Contributor c = new Contributor();
c.login = username;
c.contributions = _contributionMap.get(username);
dataWithAgePair = new Pair<>(c, System.currentTimeMillis());
list.add(dataWithAgePair);
}
return Observable.fromIterable(list);
}
public static void main(String[] args) {
List<EmployeeRating> employeeList = new ArrayList<EmployeeRating>();
EmployeeRating employeeRating1 = new EmployeeRating();
employeeRating1.setName("Lilly");
employeeRating1.setRating(6);
employeeList.add(employeeRating1);
employeeRating1 = new EmployeeRating();
employeeRating1.setName("Peter");
employeeRating1.setRating(5);
employeeList.add(employeeRating1);
employeeRating1 = new EmployeeRating();
employeeRating1.setName("Bhakti");
employeeRating1.setRating(9);
employeeList.add(employeeRating1);
employeeRating1 = new EmployeeRating();
employeeRating1.setName("Harmi");
employeeRating1.setRating(9);
employeeList.add(employeeRating1);
Observable<EmployeeRating> employeeRatingSource = Observable.fromIterable(employeeList);
employeeRatingSource.filter(employeeRating -> employeeRating.getRating() >=7)
.subscribe(empRating -> System.out.println("Star Employee: " + empRating.getName()
+ " Rating : "+empRating.getRating()));
}
@Test public void thenAllValuesAreBufferedAndReceived() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertEquals(testList, receivedInts);
}
@Test public void whenLatestStrategyUsed_thenTheLastElementReceived() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(receivedInts.contains(100000));
}
public static void main(String[] args) {
List<Beer> beers = loadCellar(); // populate the beer collection
System.out.println("== Observable creation from an Iterable");
Observable<Beer> observableBeer = Observable.fromIterable(beers);
observableBeer.subscribe(
beer -> System.out.println(beer),
error -> System.err.println(error),
() -> System.out.println("Streaming is over")
);
}
public static void main(String[] args) {
List<Beer> beers = loadCellar(); // populate the beer collection
Observable<Beer> observableBeer =
Observable.fromIterable(beers); // Create Observable from a List
observableBeer.subscribe(
beer -> System.out.println(beer) // onNext handler
);
}
public static void main(String[] args) {
List<Beer> beers = loadCellar(); // populate the beer collection
// === Java 8 Stream
System.out.println("\n== Iterating over Java 8 Stream");
beers.stream()
.skip(1)
.limit(3)
.filter(b -> "USA".equals(b.country))
.map(b -> b.name + ": $" + b.price)
.forEach(beer -> System.out.println(beer));
// === RxJava Observable
Observable<Beer> observableBeer = null;
System.out.println("\n== Subscribing to Observable ");
observableBeer = Observable.fromIterable(beers);
observableBeer
.skip(1)
.take(3)
.filter(b -> "USA".equals(b.country))
.map(b -> b.name + ": $" + b.price)
.subscribe(
beer -> System.out.println(beer),
err -> System.out.println(err),
() -> System.out.println("Streaming is complete"),
disposable -> System.out.println( " !!! Someone just subscribed to the beer stream!!! ")
);
}
@Test
public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() {
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3);
}
exec.shutdown();
}
public Observable<T> getCurrentList() {
return Observable.fromIterable(list);
}
private Observable<String> lines() {
return Observable.fromIterable(() -> Jabberwocky.lines().iterator());
}
public Observable<Boolean> transactionFromAddress(final Transaction transaction, final Address address,
final Observable<List<NamespaceId>> namespaceIdsObservable) {
if (transaction.getSigner().filter(s -> s.getAddress().equals(address)).isPresent()) {
return Observable.just(true);
}
if (transaction instanceof AggregateTransaction) {
final AggregateTransaction aggregateTransaction = (AggregateTransaction) transaction;
if (aggregateTransaction.getCosignatures().stream()
.anyMatch(c -> c.getSigner().getAddress().equals(address))) {
return Observable.just(true);
}
//Recursion...
Observable<Transaction> innerTransactionObservable = Observable
.fromIterable(aggregateTransaction.getInnerTransactions());
return innerTransactionObservable
.flatMap(t -> this.transactionFromAddress(t, address, namespaceIdsObservable).filter(a -> a))
.first(false).toObservable();
}
if (transaction instanceof PublicKeyLinkTransaction) {
return Observable.just(Address
.createFromPublicKey(((PublicKeyLinkTransaction) transaction).getLinkedPublicKey().toHex(),
transaction.getNetworkType()).equals(address));
}
if (transaction instanceof MetadataTransaction) {
MetadataTransaction metadataTransaction = (MetadataTransaction) transaction;
return Observable.just(metadataTransaction.getTargetAddress().equals(address));
}
if (transaction instanceof TargetAddressTransaction) {
TargetAddressTransaction targetAddressTransaction = (TargetAddressTransaction) transaction;
if (targetAddressTransaction.getTargetAddress() instanceof Address) {
return Observable.just(targetAddressTransaction.getTargetAddress().equals(address));
}
return namespaceIdsObservable
.map(namespaceIds -> namespaceIds.contains(targetAddressTransaction.getTargetAddress()));
}
if (transaction instanceof MultisigAccountModificationTransaction) {
MultisigAccountModificationTransaction multisigAccountModificationTransaction = (MultisigAccountModificationTransaction) transaction;
if (multisigAccountModificationTransaction.getAddressAdditions().stream()
.anyMatch(a -> a.equals(address))) {
return Observable.just(true);
}
return Observable.just(
multisigAccountModificationTransaction.getAddressDeletions().stream().anyMatch(a -> a.equals(address)));
}
if (transaction instanceof AccountAddressRestrictionTransaction) {
AccountAddressRestrictionTransaction accountAddressRestrictionTransaction = (AccountAddressRestrictionTransaction) transaction;
if (accountAddressRestrictionTransaction.getRestrictionAdditions().contains(address)) {
return Observable.just(true);
}
if (accountAddressRestrictionTransaction.getRestrictionDeletions().contains(address)) {
return Observable.just(true);
}
return namespaceIdsObservable.flatMap(namespaceIds -> {
if (namespaceIds.stream().anyMatch(
namespaceId -> accountAddressRestrictionTransaction.getRestrictionAdditions()
.contains(namespaceId))) {
return Observable.just(true);
}
if (namespaceIds.stream().anyMatch(
namespaceId -> accountAddressRestrictionTransaction.getRestrictionDeletions()
.contains(namespaceId))) {
return Observable.just(true);
}
return Observable.just(false);
});
}
if (transaction instanceof RecipientTransaction) {
RecipientTransaction recipientTransaction = (RecipientTransaction) transaction;
if (recipientTransaction.getRecipient() instanceof NamespaceId) {
return namespaceIdsObservable
.map(namespaceIds -> namespaceIds.contains(recipientTransaction.getRecipient()));
}
return Observable.just(recipientTransaction.getRecipient().equals(address));
}
return Observable.just(false);
}
@Override
public Observable<Department> getDeptsRx() {
Observable<Department> depts= Observable.fromIterable(departmentDaoImpl.getDepartments());
return depts;
}
@Override
public Observable<Department> getDeptsRx() {
Observable<Department> depts= Observable.fromIterable(departmentDaoImpl.getDepartments());
return depts;
}
@Override
public Observable<Department> getDeptsRx() {
Observable<Department> depts= Observable.fromIterable(departmentDaoImpl.getDepartments());
return depts;
}
Observable<Post> findAll() {
return Observable.fromIterable(DATA);
}
public Observable<Integer> numbers() {
return Observable.fromIterable(numberGenerator.numbers());
}
public Observable<Long> numbers(int upUntil) {
return Observable.fromIterable(numberGenerator.numbers(upUntil));
}
public Observable<String> elements() {
return Observable.fromIterable(stringGenerator.randomStringList());
}
/**
* @return an {@link Observable} that emits events "hello" and "world" before completing.
*/
public static Observable<String> hello() {
// Guava ImmutableList class is an implementation detail.
List<String> values = ImmutableList.of("hello", "world");
return Observable.fromIterable(values);
}