下面列出了io.reactivex.Observable#fromArray ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void zip_test3() {
Integer[] numbers = {1, 2, 13, 34, 15, 17};
String[] fruits = {"苹果", "梨", "李子", "荔枝",
"芒果"};
Observable<Integer> source1 = Observable.fromArray(numbers);
Observable<String> source2 = Observable.fromArray(fruits);
Observable<Integer> source3 = Observable.range(10, 3);
Observable.zip(source1, source2, source3,
(value1, value2, value3) -> value1 + ":" + value2 + "*" + value3)
.subscribe(item -> System.out.println("we got: " + item + " from the Observable"),
throwable -> System.out.println("异常-> " + throwable.getMessage()),
() -> System.out.println("Emission completed"));
}
private static void transforming(){
Observable<String> obs = Observable.fromArray("one", "two");
obs.map(s -> s.contains("w") ? 1 : 0).forEach(System.out::print); //prints: 01
List<String> os = new ArrayList<>();
List<String> noto = new ArrayList<>();
obs.flatMap(s -> Observable.fromArray(s.split("")))
.groupBy(s -> "o".equals(s) ? "o" : "noto")
.subscribe(g -> g.subscribe(s -> {
if (g.getKey().equals("o")) {
os.add(s);
} else {
noto.add(s);
}
}));
System.out.println(os); //prints: [o, o]
System.out.println(noto); //prints: [n, e, t, w]
pauseMs(100);
}
@Test
public void createUniFromRx() {
// tag::uni-create[]
Completable completable = Completable.complete();
Single<String> single = Single.just("hello");
Maybe<String> maybe = Maybe.just("hello");
Maybe<String> emptyMaybe = Maybe.empty();
Observable<String> observable = Observable.fromArray("a", "b", "c");
Flowable<String> flowable = Flowable.fromArray("a", "b", "c");
Uni<Void> uniFromCompletable = Uni.createFrom().converter(UniRxConverters.fromCompletable(), completable);
Uni<String> uniFromSingle = Uni.createFrom().converter(UniRxConverters.fromSingle(), single);
Uni<String> uniFromMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), maybe);
Uni<String> uniFromEmptyMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), emptyMaybe);
Uni<String> uniFromObservable = Uni.createFrom().converter(UniRxConverters.fromObservable(), observable);
Uni<String> uniFromFlowable = Uni.createFrom().converter(UniRxConverters.fromFlowable(), flowable);
Uni<String> uniFromPublisher = Uni.createFrom().publisher(flowable);
// end::uni-create[]
assertThat(uniFromCompletable.await().indefinitely()).isNull();
assertThat(uniFromSingle.await().indefinitely()).isEqualTo("hello");
assertThat(uniFromMaybe.await().indefinitely()).isEqualTo("hello");
assertThat(uniFromEmptyMaybe.await().indefinitely()).isNull();
assertThat(uniFromObservable.await().indefinitely()).isEqualTo("a");
assertThat(uniFromFlowable.await().indefinitely()).isEqualTo("a");
assertThat(uniFromPublisher.await().indefinitely()).isEqualTo("a");
}
@Test
public void createMultiFromRx() {
// tag::multi-create[]
Completable completable = Completable.complete();
Single<String> single = Single.just("hello");
Maybe<String> maybe = Maybe.just("hello");
Maybe<String> emptyMaybe = Maybe.empty();
Observable<String> observable = Observable.fromArray("a", "b", "c");
Flowable<String> flowable = Flowable.fromArray("a", "b", "c");
Multi<Void> multiFromCompletable = Multi.createFrom()
.converter(MultiRxConverters.fromCompletable(), completable);
Multi<String> multiFromSingle = Multi.createFrom().converter(MultiRxConverters.fromSingle(), single);
Multi<String> multiFromMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), maybe);
Multi<String> multiFromEmptyMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), emptyMaybe);
Multi<String> multiFromObservable = Multi.createFrom()
.converter(MultiRxConverters.fromObservable(), observable);
Multi<String> multiFromFlowable = Multi.createFrom().converter(MultiRxConverters.fromFlowable(), flowable);
Multi<String> multiFromPublisher = Multi.createFrom().publisher(flowable);
// end::multi-create[]
assertThat(multiFromCompletable.collectItems().first().await().indefinitely()).isNull();
assertThat(multiFromSingle.collectItems().first().await().indefinitely()).isEqualTo("hello");
assertThat(multiFromMaybe.collectItems().first().await().indefinitely()).isEqualTo("hello");
assertThat(multiFromEmptyMaybe.collectItems().first().await().indefinitely()).isNull();
assertThat(multiFromObservable.collectItems().asList().await().indefinitely()).containsExactly("a", "b", "c");
assertThat(multiFromFlowable.collectItems().asList().await().indefinitely()).containsExactly("a", "b", "c");
assertThat(multiFromPublisher.collectItems().asList().await().indefinitely()).containsExactly("a", "b", "c");
}
@Test
void combineLatest_test() {
Integer[] numbers = {1, 2, 13, 34, 15, 17};
String[] fruits = {"苹果", "梨", "李子", "荔枝",
"芒果"};
Observable<Integer> source1 = Observable.fromArray(numbers);
Observable<String> source2 = Observable.fromArray(fruits);
Observable.combineLatest(source2, source1, (item1, item2) -> item1 + item2)
.subscribe(item -> System.out.println("we got: " + item + " from the Observable"),
throwable -> System.out.println("异常-> " + throwable.getMessage()),
() -> System.out.println("Emission completed"));
}
@Test
void amb_test() {
Integer[] numbers = {1, 2, 13, 34, 15, 17};
String[] fruits = {"苹果", "梨", "李子", "荔枝",
"芒果"};
Observable<Integer> source1 = Observable.fromArray(numbers).delay(1, TimeUnit.SECONDS);
Observable<String> source2 = Observable.fromArray(fruits);
Observable.ambArray(source1, source2)
.forEach(System.out::println);
}
@Test
public void testAverageObservable() {
Observable<Integer> numberObservable = Observable.fromArray(1, 2, 3, 4, 5);
MathObservable.averageDouble(numberObservable)
.subscribe(new Observer<Double>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Double aDouble) {
System.out.println("Average: " + aDouble);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Integer[] numbers = { 1, 2, 13, 34, 12, 10 };
String[] fruits = { "mango", "pineapple", "apple", "mango", "papaya" };
Observable<Integer> source1 = Observable.fromArray(numbers);
Observable<String> source2 = Observable.fromArray(fruits);
Observable<Integer> source3 = Observable.range(30, 3);
Observable.zip(source1, source2, source3, (value1, value2, value3) -> {
return value1 + ":" + value2 + "*" + value3;
}).subscribe(new Observer<String>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("zipping completed successfully");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(String value) {
// TODO Auto-generated method stub
System.out.println(value);
}
@Override
public void onSubscribe(Disposable arg0) {
// TODO Auto-generated method stub
}
});
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable names1 = Observable.fromArray(new String[] { "name1", "abc", "xyz", "lmn" });
Observable names2 = Observable.fromArray(new String[] { "name1", "abc", "xyz", "lmn" });
Observable.sequenceEqual(names1, names2).subscribe(new SingleObserver<Boolean>() {
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
System.out.println(throwable.getMessage());
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
System.out.println("is disposed:-" + disposable.isDisposed());
}
@Override
public void onSuccess(Boolean value) {
// TODO Auto-generated method stub
if (value) {
System.out
.println("successfully finished comparision of two sequence and both sequences are equal");
}
else
System.out.println("successfully finished comparision of two sequence and both sequences are NOT equal");
}
});
}
private static void generalExample(RxJava2ObsHub rxJava2Hub) {
Observable src1 = Observable.fromArray(new Integer[]{1, 3, 5, 7, 11, 13});
Observable src2 = Observable.interval(1, TimeUnit.SECONDS);
rxJava2Hub.addUpstream("src1", src1);
rxJava2Hub.getPub("src1").subscribe(o -> {
System.out.println("consumer1 (src1) got: " + o);
});
rxJava2Hub.getPub("src1").subscribe(System.out::println);
rxJava2Hub.getPub("src1.1").subscribe(o -> {
System.out.println("consumer1 (src1.1) got: " + o);
});
rxJava2Hub.addUpstream("src1.1", src1.repeat(1));
rxJava2Hub.addUpstream("src2", src2.buffer(Integer.MAX_VALUE));
rxJava2Hub.getPub("src1").subscribe(o -> {
System.out.println("consumer2 (src1) got: " + o);
});
rxJava2Hub.getPub("src1.1").subscribe(o -> {
System.out.println("consumer2 (src1.1) got: " + o);
});
rxJava2Hub.getPub("src2").subscribe(o -> {
System.out.println("consumer2 (src2) got: " + o);
});
new Thread(() -> {
try {
Thread.sleep(5000);
rxJava2Hub.addUpstream("src1.1", Observable.interval(1, TimeUnit.SECONDS));
Thread.sleep(5000);
rxJava2Hub.clearUpstream();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
private void doSomeWork() {
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.concat(aObservable, bObservable)
.subscribe(getObserver());
}
private void doSomeWork() {
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.merge(aObservable, bObservable)
.subscribe(getObserver());
}
private Observable<PlexItem> mediaTypes(Library lib) {
return Observable.fromArray(new MediaType[]{
MediaType.builder()
.title("Artists")
.type(Type.ARTIST)
.mediaKey("8")
.libraryKey(lib.key())
.libraryId(lib.uuid())
.uri(lib.uri())
.build(),
MediaType.builder()
.title("Albums")
.type(Type.ALBUM)
.mediaKey("9")
.libraryKey(lib.key())
.libraryId(lib.uuid())
.uri(lib.uri())
.build(),
MediaType.builder()
.title("Tracks")
.type(Type.TRACK)
.mediaKey("10")
.libraryKey(lib.key())
.libraryId(lib.uuid())
.uri(lib.uri())
.build()
});
}
@Test
public void test() {
Observable<Integer> source = Observable.fromArray(
0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
source
.compose(skipSingleZero())
.test()
.assertResult(
0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
}
@Test
public void test2() {
Observable<Integer> source = Observable.fromArray(
0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
0, 0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
source
.compose(skipSingleZero())
.test()
.assertResult(
0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
0, 0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
}
@Stream
@Path("stream")
@GET
public Observable<String> helloStream() {
return Observable.fromArray(new String[] {"Hello", "World"});
}
@Stream(MODE.RAW)
@Path("hello-observable")
@GET
public Observable<String> helloObservable() {
return Observable.fromArray("one", "two");
}
@Produces(MediaType.APPLICATION_JSON)
@Path("hello-observable-collect")
@GET
public Observable<String> helloObservableCollect() {
return Observable.fromArray("one", "two");
}
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("hello-observable-sse")
@GET
public Observable<String> helloObservableSse() {
return Observable.fromArray("one", "two");
}
public static void main(String[] args) {
Integer[] numbers = { 1, 2, 13, 34, 12, 10 };
Observable<Integer> source1 = Observable.fromArray(numbers);
source1.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) throws Exception {
// TODO Auto-generated method stub
// 1, 2, 13, 34, 12, 10
int sum = 0;
return value1 + value2;
}
}).subscribe(new MaybeObserver<Integer>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("completed2");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
System.out.println(throwable.getMessage());
}
@Override
public void onSubscribe(Disposable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onSuccess(Integer value) {
// TODO Auto-generated method stub
System.out.println(value);
}
});
}