io.reactivex.Observable#range ( )源码实例Demo

下面列出了io.reactivex.Observable#range ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
void zip_test3() {
    Integer[] numbers = {1, 2, 13, 34, 15, 17};
    String[] fruits = {"苹果", "梨", "李子", "荔枝",
            "芒果"};

    Observable<Integer> source1 = Observable.fromArray(numbers);
    Observable<String> source2 = Observable.fromArray(fruits);
    Observable<Integer> source3 = Observable.range(10, 3);

    Observable.zip(source1, source2, source3,
            (value1, value2, value3) -> value1 + ":" + value2 + "*" + value3)
              .subscribe(item -> System.out.println("we got: " + item + "  from the Observable"),
                      throwable -> System.out.println("异常-> " + throwable.getMessage()),
                      () -> System.out.println("Emission completed"));
}
 
private static void utilities(){
    Observable<String> obs = Observable.just("one")
            .flatMap(s -> Observable.fromArray(s.split("")));

    obs.delay(5, TimeUnit.MILLISECONDS)
            .subscribe(System.out::print);         //prints: one
    pauseMs(10);

    System.out.println();

    Observable source = Observable.range(1,5);
    Disposable disposable = source.subscribe();
    Observable.using(
            () -> disposable,
            x -> source,
            y -> System.out.println("Disposed: " + y) //prints: Disposed: DISPOSED
    )
    .delay(10, TimeUnit.MILLISECONDS)
    .subscribe(System.out::print);                   //prints: 12345
    pauseMs(25);
}
 
private static void observableBlocking2(){
    Observable<Integer> obs = Observable.range(1,5);

    List<Double> list = new ArrayList<>();
    obs.filter(i -> i % 2 == 0)
            .doOnNext(System.out::println)  //prints 2 and 4
            .map(Math::sqrt)
            .delay(100, TimeUnit.MILLISECONDS)
            .subscribe(d -> {
                if(list.size() == 1){
                    list.remove(0);
                }
                list.add(d);
            });
    System.out.println(list);   //prints: []

    try {
        TimeUnit.MILLISECONDS.sleep(200);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(list);   //prints: [2.0]
}
 
private static void observableBlocking1(){
    Observable<Integer> obs = Observable.range(1,5);

    Double d2 = obs.filter(i -> i % 2 == 0)
                   .doOnNext(System.out::println)  //prints 2 and 4
                   .map(Math::sqrt)
                   .delay(100, TimeUnit.MILLISECONDS)
                   .blockingLast();
    System.out.println(d2);   //prints: 2.0

    List<Double> list = new ArrayList<>();
    obs.filter(i -> i % 2 == 0)
       .doOnNext(System.out::println)  //prints 2 and 4
       .map(Math::sqrt)
       .delay(100, TimeUnit.MILLISECONDS)
       .subscribe(d -> {
            if(list.size() == 1){
                list.remove(0);
            }
            list.add(d);
       });
    System.out.println(list);   //prints: []
}
 
@Test
void take_skip_test() {
    Observable<Integer> range = Observable.range(1, 5);
    range.take(3)
         .collect(ArrayList::new, List::add)
         .subscribe(System.out::println);  // [1, 2, 3]

    range.skip(3)
         .collect(ArrayList::new, List::add)
         .subscribe(System.out::println);  // [4, 5]
    range.skip(5)
         .collect(ArrayList::new, List::add)
         .subscribe(System.out::println);  // []


    range.takeLast(2)
         .collect(ArrayList::new, List::add)
         .subscribe(System.out::println);  // [4, 5]
    range.skipLast(2)
         .collect(ArrayList::new, List::add)
         .subscribe(System.out::println);  // [1, 2, 3]

    range.takeUntil(x -> x == 3)
         .collect(ArrayList::new, List::add)
         .subscribe(System.out::println);  // [1, 2, 3]
    range.takeWhile(x -> x != 3)
         .collect(ArrayList::new, List::add)
         .subscribe(System.out::println);  // [1, 2]

    range.all(x -> x != 4)
         .subscribe(System.out::println);    // [false]
    range.contains(4)
         .subscribe(System.out::println);         // [true]
}
 
private static void disposable2(){
    Observable source = Observable.range(1,5);
    Disposable disposable = source.subscribe();
    Observable<Integer> obs = Observable.using(
            () -> disposable,
            r -> source,
            r -> System.out.println("Disposed: " + r) //prints: Disposed: DISPOSED
    );
    List<Double> list = new ArrayList<>();
    obs.filter(i -> i % 2 == 0)
            .doOnNext(System.out::println)  //prints 2 and 4
            .map(Math::sqrt)
            .delay(100, TimeUnit.MILLISECONDS)
            .subscribe(d -> {
                if(list.size() == 1){
                    list.remove(0);
                }
                list.add(d);
            });
    System.out.println(list);                  //prints: []
    try {
        TimeUnit.MILLISECONDS.sleep(200);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(list);                  //prints: [2.0]
}
 
private static void disposable1(){
    Observable<Integer> obs = Observable.range(1,5);

    List<Double> list = new ArrayList<>();
    Disposable disposable =
            obs.filter(i -> i % 2 == 0)
                    .doOnNext(System.out::println)  //prints 2 and 4
                    .map(Math::sqrt)
                    .delay(100, TimeUnit.MILLISECONDS)
                    .subscribe(d -> {
                        if(list.size() == 1){
                            list.remove(0);
                        }
                        list.add(d);
                    });
    System.out.println(list);   //prints: []
    System.out.println(disposable.isDisposed()); //prints: false
    disposable.dispose();

    try {
        TimeUnit.MILLISECONDS.sleep(200);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println(disposable.isDisposed());  //prints: true
    disposable.dispose();

    System.out.println(list);   //prints: [2.0]

}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observable<Integer> observable = Observable.range(1, 50);

	observable.take(5).subscribe(new Observer<Integer>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println(Thread.currentThread().getName() + " finished reading of items");

		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			System.out.println(Thread.currentThread().getName() + " finished with exception");

		}

		@Override
		public void onNext(Integer value) {
			// TODO Auto-generated method stub
			System.out.println(Thread.currentThread().getName() + " read item:-"+value);

		}

		@Override
		public void onSubscribe(Disposable arg0) {
			// TODO Auto-generated method stub

		}
	});

}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observable<Integer> observable=Observable.range(10, 5);
	observable.subscribe(number->System.out.println(number));
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Integer[] numbers = { 1, 2, 13, 34, 12, 10 };
	String[] fruits = { "mango", "pineapple", "apple", "mango", "papaya" };

	Observable<Integer> source1 = Observable.fromArray(numbers);
	Observable<String> source2 = Observable.fromArray(fruits);
	Observable<Integer> source3 = Observable.range(30, 3);
	Observable.zip(source1, source2, source3, (value1, value2, value3) -> {

		return value1 + ":" + value2 + "*" + value3;
	}).subscribe(new Observer<String>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("zipping completed successfully");

		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();

		}

		@Override
		public void onNext(String value) {
			// TODO Auto-generated method stub
			System.out.println(value);
		}

		@Override
		public void onSubscribe(Disposable arg0) {
			// TODO Auto-generated method stub

		}
	});
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observable source1=	Observable.range(1,5);
	Observable source2=Observable.just(10,20,30);
	Observable.concat(source1,source2).subscribe(new Consumer<Integer>() {

		@Override
		public void accept(Integer value) throws Exception {
			// TODO Auto-generated method stub
			System.out.println(value);
			
		}
	});

}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observable<Integer> observable = Observable.range(10, 9);
	observable.take(3).count().subscribe(item -> System.out.println("emitted " + item + " items"));
	observable.take(3).subscribe(new Observer<Integer>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("collected  items successfully");

		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();
		}

		@Override
		public void onNext(Integer value) {
			// TODO Auto-generated method stub
			System.out.println("collected item: " + value);

		}

		@Override
		public void onSubscribe(Disposable disposable) {
			// TODO Auto-generated method stub

		}
	});
}
 
源代码13 项目: akarnokd-misc   文件: NotInFirst.java
@Test
public void test() {
    Observable<Integer> obs1 = Observable.range(1, 10);
    Observable<Integer> obs2 = Observable.range(5, 10);
    
    obs1.collect(HashSet::new, (a, b) -> a.add(b))
    .flatMapObservable(set -> 
        obs2.filter(set::contains)
    )
    .subscribe(System.out::println);
}
 
源代码14 项目: tutorials   文件: RxJavaHooksUnitTest.java
@Test
public void givenObservable_whenAssembled_shouldExecuteTheHook() {

    RxJavaPlugins.setOnObservableAssembly(observable -> {
        hookCalled = true;
        return observable;
    });

    Observable.range(1, 10);
    assertTrue(hookCalled);
}
 
源代码15 项目: tutorials   文件: FlowableIntegrationTest.java
@Test public void whenMissingStrategyUsed_thenException() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test();

    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}
 
源代码16 项目: tutorials   文件: FlowableIntegrationTest.java
@Test public void  whenErrorStrategyUsed_thenExceptionIsThrown() {
        Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test();

    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observable<Integer> observable=Observable.range(10, 5);
	observable.subscribe(number->System.out.println(number));
}