下面列出了io.reactivex.Observable#range ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void zip_test3() {
Integer[] numbers = {1, 2, 13, 34, 15, 17};
String[] fruits = {"苹果", "梨", "李子", "荔枝",
"芒果"};
Observable<Integer> source1 = Observable.fromArray(numbers);
Observable<String> source2 = Observable.fromArray(fruits);
Observable<Integer> source3 = Observable.range(10, 3);
Observable.zip(source1, source2, source3,
(value1, value2, value3) -> value1 + ":" + value2 + "*" + value3)
.subscribe(item -> System.out.println("we got: " + item + " from the Observable"),
throwable -> System.out.println("异常-> " + throwable.getMessage()),
() -> System.out.println("Emission completed"));
}
private static void utilities(){
Observable<String> obs = Observable.just("one")
.flatMap(s -> Observable.fromArray(s.split("")));
obs.delay(5, TimeUnit.MILLISECONDS)
.subscribe(System.out::print); //prints: one
pauseMs(10);
System.out.println();
Observable source = Observable.range(1,5);
Disposable disposable = source.subscribe();
Observable.using(
() -> disposable,
x -> source,
y -> System.out.println("Disposed: " + y) //prints: Disposed: DISPOSED
)
.delay(10, TimeUnit.MILLISECONDS)
.subscribe(System.out::print); //prints: 12345
pauseMs(25);
}
private static void observableBlocking2(){
Observable<Integer> obs = Observable.range(1,5);
List<Double> list = new ArrayList<>();
obs.filter(i -> i % 2 == 0)
.doOnNext(System.out::println) //prints 2 and 4
.map(Math::sqrt)
.delay(100, TimeUnit.MILLISECONDS)
.subscribe(d -> {
if(list.size() == 1){
list.remove(0);
}
list.add(d);
});
System.out.println(list); //prints: []
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(list); //prints: [2.0]
}
private static void observableBlocking1(){
Observable<Integer> obs = Observable.range(1,5);
Double d2 = obs.filter(i -> i % 2 == 0)
.doOnNext(System.out::println) //prints 2 and 4
.map(Math::sqrt)
.delay(100, TimeUnit.MILLISECONDS)
.blockingLast();
System.out.println(d2); //prints: 2.0
List<Double> list = new ArrayList<>();
obs.filter(i -> i % 2 == 0)
.doOnNext(System.out::println) //prints 2 and 4
.map(Math::sqrt)
.delay(100, TimeUnit.MILLISECONDS)
.subscribe(d -> {
if(list.size() == 1){
list.remove(0);
}
list.add(d);
});
System.out.println(list); //prints: []
}
@Test
void take_skip_test() {
Observable<Integer> range = Observable.range(1, 5);
range.take(3)
.collect(ArrayList::new, List::add)
.subscribe(System.out::println); // [1, 2, 3]
range.skip(3)
.collect(ArrayList::new, List::add)
.subscribe(System.out::println); // [4, 5]
range.skip(5)
.collect(ArrayList::new, List::add)
.subscribe(System.out::println); // []
range.takeLast(2)
.collect(ArrayList::new, List::add)
.subscribe(System.out::println); // [4, 5]
range.skipLast(2)
.collect(ArrayList::new, List::add)
.subscribe(System.out::println); // [1, 2, 3]
range.takeUntil(x -> x == 3)
.collect(ArrayList::new, List::add)
.subscribe(System.out::println); // [1, 2, 3]
range.takeWhile(x -> x != 3)
.collect(ArrayList::new, List::add)
.subscribe(System.out::println); // [1, 2]
range.all(x -> x != 4)
.subscribe(System.out::println); // [false]
range.contains(4)
.subscribe(System.out::println); // [true]
}
private static void disposable2(){
Observable source = Observable.range(1,5);
Disposable disposable = source.subscribe();
Observable<Integer> obs = Observable.using(
() -> disposable,
r -> source,
r -> System.out.println("Disposed: " + r) //prints: Disposed: DISPOSED
);
List<Double> list = new ArrayList<>();
obs.filter(i -> i % 2 == 0)
.doOnNext(System.out::println) //prints 2 and 4
.map(Math::sqrt)
.delay(100, TimeUnit.MILLISECONDS)
.subscribe(d -> {
if(list.size() == 1){
list.remove(0);
}
list.add(d);
});
System.out.println(list); //prints: []
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(list); //prints: [2.0]
}
private static void disposable1(){
Observable<Integer> obs = Observable.range(1,5);
List<Double> list = new ArrayList<>();
Disposable disposable =
obs.filter(i -> i % 2 == 0)
.doOnNext(System.out::println) //prints 2 and 4
.map(Math::sqrt)
.delay(100, TimeUnit.MILLISECONDS)
.subscribe(d -> {
if(list.size() == 1){
list.remove(0);
}
list.add(d);
});
System.out.println(list); //prints: []
System.out.println(disposable.isDisposed()); //prints: false
disposable.dispose();
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(disposable.isDisposed()); //prints: true
disposable.dispose();
System.out.println(list); //prints: [2.0]
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable<Integer> observable = Observable.range(1, 50);
observable.take(5).subscribe(new Observer<Integer>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName() + " finished reading of items");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName() + " finished with exception");
}
@Override
public void onNext(Integer value) {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName() + " read item:-"+value);
}
@Override
public void onSubscribe(Disposable arg0) {
// TODO Auto-generated method stub
}
});
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable<Integer> observable=Observable.range(10, 5);
observable.subscribe(number->System.out.println(number));
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Integer[] numbers = { 1, 2, 13, 34, 12, 10 };
String[] fruits = { "mango", "pineapple", "apple", "mango", "papaya" };
Observable<Integer> source1 = Observable.fromArray(numbers);
Observable<String> source2 = Observable.fromArray(fruits);
Observable<Integer> source3 = Observable.range(30, 3);
Observable.zip(source1, source2, source3, (value1, value2, value3) -> {
return value1 + ":" + value2 + "*" + value3;
}).subscribe(new Observer<String>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("zipping completed successfully");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(String value) {
// TODO Auto-generated method stub
System.out.println(value);
}
@Override
public void onSubscribe(Disposable arg0) {
// TODO Auto-generated method stub
}
});
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable source1= Observable.range(1,5);
Observable source2=Observable.just(10,20,30);
Observable.concat(source1,source2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
// TODO Auto-generated method stub
System.out.println(value);
}
});
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable<Integer> observable = Observable.range(10, 9);
observable.take(3).count().subscribe(item -> System.out.println("emitted " + item + " items"));
observable.take(3).subscribe(new Observer<Integer>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("collected items successfully");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Integer value) {
// TODO Auto-generated method stub
System.out.println("collected item: " + value);
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
}
});
}
@Test
public void test() {
Observable<Integer> obs1 = Observable.range(1, 10);
Observable<Integer> obs2 = Observable.range(5, 10);
obs1.collect(HashSet::new, (a, b) -> a.add(b))
.flatMapObservable(set ->
obs2.filter(set::contains)
)
.subscribe(System.out::println);
}
@Test
public void givenObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableAssembly(observable -> {
hookCalled = true;
return observable;
});
Observable.range(1, 10);
assertTrue(hookCalled);
}
@Test public void whenMissingStrategyUsed_thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
@Test public void whenErrorStrategyUsed_thenExceptionIsThrown() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable<Integer> observable=Observable.range(10, 5);
observable.subscribe(number->System.out.println(number));
}