下面列出了io.reactivex.Observable#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String... args) {
Observable<String> stream = Observable.create(subscriber -> {
// Emit items
subscriber.onNext("Black Canary");
subscriber.onNext("Catwoman");
subscriber.onNext("Elektra");
// Notify the completion
subscriber.onComplete();
});
stream
.subscribe(
i -> System.out.println("Received: " + i),
err -> System.out.println("BOOM"),
() -> System.out.println("Completion")
);
}
@Override
public void importDataS(String text) {
Observable<Boolean> observable = ReplaceRuleManager.importReplaceRule(text);
if (observable != null) {
observable.subscribe(new MyObserver<Boolean>() {
@Override
public void onNext(Boolean aBoolean) {
mView.refresh();
mView.toast("导入成功");
}
@Override
public void onError(Throwable e) {
mView.toast("格式不对");
}
});
} else {
mView.toast("导入失败");
}
}
public static void main(String[] args) {
Observable<String> month_observable = Observable.create(new
ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter)
throws Exception {
// TODO Auto-generated method stub
try {
String[] monthArray = { "Jan", "Feb", "Mar",
"Apl", "May", "Jun", "July", "Aug",
"Sept", "Oct","Nov", "Dec" };
List<String> months = Arrays.asList(monthArray);
for (String month : months) {
emitter.onNext(month);
}
emitter.onComplete();
} catch (Exception e) {
// TODO: handle exception
emitter.onError(e);
}
}
});
month_observable.subscribe(s -> System.out.println(s));
}
public static void main(String[] args) {
String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May"};
List<String> months = Arrays.asList(monthArray);
Observable<Object> observable = Observable.create(subscriber -> {
try {
System.out.println("creating ");
for (String data : months) {
subscriber.onNext(data);
}
subscriber.onComplete();
} catch (Exception e) {
// TODO: handle exception
subscriber.onError(e);
}
});
observable.subscribe(item -> System.out.println("month:-" + item));
observable.subscribe(item -> System.out.println("month:-" + item));
}
public static void main(String[] args) {
Observable<String> source =
Observable.create(
data-> {
System.out.println(" ** Starting Emitting **");
data.onNext("One");
data.onNext("Two");
data.onNext("Three");
data.onNext("Four");
data.onNext("Five");
});
//("One","Two","Three","Four","Five");
//first observer
source.filter(data->data.contains("o"))
.subscribe(data -> System.out.println("Observer 1 Received:" + data));
//second observer
source.subscribe(data -> System.out.println("Observer 2 Received:" + data));
}
public DeleteButtonView(Context context, @Nullable AttributeSet attrs) {
super(context, attrs);
mContext = context;
mTimer = new Timer();
Observable<Integer> observable = Observable.create(e -> mEmitter = e);
Disposable subscribe = observable
.subscribe(integer -> {
if (integer == 1) {
int index = mEditText.getSelectionStart();
if (index > 0) {
Editable editable = mEditText.getText();
editable.delete(index - 1, index);
}
} else {
mTimer.cancel();
}
});
}
@Override
protected void onActivityResult(int requestCode, int resultCode, Intent data) {
super.onActivityResult(requestCode, resultCode, data);
if (requestCode == REQUEST_QR && resultCode == RESULT_OK && null != data) {
String result = data.getStringExtra("result");
Observable<List<BookSourceBean>> observable = BookSourceManager.importSource(result);
if (observable != null) {
observable.subscribe(new MyObserver<List<BookSourceBean>>() {
@SuppressLint("DefaultLocale")
@Override
public void onNext(List<BookSourceBean> bookSourceBeans) {
if (bookSourceBeans.size() > 1) {
toast(String.format("导入成功%d个书源, 显示第一个", bookSourceBeans.size()));
setText(bookSourceBeans.get(0));
} else if (bookSourceBeans.size() == 1) {
setText(bookSourceBeans.get(0));
} else {
toast("未导入");
}
}
@Override
public void onError(Throwable e) {
toast(e.getLocalizedMessage());
}
});
} else {
toast("导入失败");
}
}
}
public static void main(String[] args) {
Observable<String> files = getHeroesNames();
files.subscribe(value -> System.out.println("Subscriber 1: " + value),
Throwable::printStackTrace);
files.subscribe(value -> System.out.println("Subscriber 2: " + value),
Throwable::printStackTrace);
}
private void doSomeWork() {
Observable<List<String>> buffered = getObservable().buffer(3, 1);
// 3 means, it takes max of three from its start index and create list
// 1 means, it jumps one step every time
// so the it gives the following list
// 1 - one, two, three
// 2 - two, three, four
// 3 - three, four, five
// 4 - four, five
// 5 - five
buffered.subscribe(getObserver());
}
@Test
public void consumerTest4(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final List<String> subscribeList = new ArrayList<>();
final String subscribed = "subscribed";
final Consumer<Object> onSubscribe = new Consumer<Object>() {
@Override
public void accept(final Object t) {
subscribeList.add(subscribed);
}
};
final List<String> completeList = new ArrayList<>();
final List<String> errorList = new ArrayList<>();
observable.subscribe(onNext, onError(errorList), onComplete(completeList, tracer), onSubscribe);
assertEquals(5, result.size());
assertEquals(1, completeList.size());
assertTrue(completeList.contains(COMPLETED));
assertEquals(1, subscribeList.size());
assertTrue(subscribeList.contains(subscribed));
assertTrue(errorList.isEmpty());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
public static void main(String[] args) {
Observable observable = Observable.create(observer -> {
observer.onNext("I am Hot Observable " + Math.random()*100);
observer.onComplete();
});
observable.subscribe(consumer -> System.out.println("message:-" + consumer));
observable.subscribe(consumer -> System.out.println("message:-" + consumer));
}
@Test
public void test() {
Subject<String> subject = PublishSubject.create();
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);
Subject<String> multicast = BehaviorSubject.create();
observable.subscribe(multicast);
Disposable first = multicast.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();
Disposable second = multicast.subscribe(System.out::println); // "zero, one"
subject.onNext("two"); // "zero, one, two"
second.dispose();
}
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Observable<Integer> zip = Observable
.zip(getDemoObservable(), getDemoObservable(), (i, i2) -> 0)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io());
final long startTime = System.currentTimeMillis();
zip.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
bindDisposable(d);
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Logger.d("--->> onCompleted bind time " + (System.currentTimeMillis() - startTime));
long delayTime = BuildConfig.SPLASH_TIME - (System.currentTimeMillis() - startTime);
if (delayTime < 0) {
delayTime = 0;
}
new WeakHandler(message -> {//TODO refactor
mView.enterHomeActivity();
return true;
}).sendEmptyMessageDelayed(0, delayTime);
}
});
}
@Test
public void testObservable() {
Observable observable = RxQuery.observable(mockQuery.getQuery());
observable.subscribe((Observer) this);
assertLatchCountedDown(latch, 2);
assertEquals(1, receivedChanges.size());
assertEquals(0, receivedChanges.get(0).size());
assertNull(error);
latch = new CountDownLatch(1);
receivedChanges.clear();
publisher.setQueryResult(listResult);
publisher.publish();
assertLatchCountedDown(latch, 5);
assertEquals(1, receivedChanges.size());
assertEquals(2, receivedChanges.get(0).size());
assertEquals(0, completedCount.get());
//Unsubscribe?
// receivedChanges.clear();
// latch = new CountDownLatch(1);
// assertLatchCountedDown(latch, 5);
//
// assertEquals(1, receivedChanges.size());
// assertEquals(3, receivedChanges.get(0).size());
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable<String> observable = Observable.empty();
observable.subscribe(item -> System.out.println("we got" + item),
error -> System.out.print(error),
()->System.out.print("I am Done!! Completed normally"));
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable<Long> observable=Observable.timer(1,TimeUnit.SECONDS,Schedulers.computation()).flatMap(i->Observable.just(12l,13l,300l,400l));
observable.subscribe(new Observer<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub00
System.out.println("completed");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Long value) {
// TODO Auto-generated method stub
System.out.println(value);
}
@Override
public void onSubscribe(Disposable arg0) {
// TODO Auto-generated method stub
System.out.println("subscribe");
}
});
try {
Thread.sleep(80000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable<Long>observable=Observable.rangeLong(1l,3l);
Observer< Long> observer=new Observer<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("on complete");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Long value) {
// TODO Auto-generated method stub
System.out.println(""+value);
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
System.out.println(disposable.isDisposed());
disposable.dispose();
}
};
observable.subscribe(observer);
}
@Test
public void netTest() {
final Retrofit mRetrofit = initRetrofit();
GankApi mGankApi = mRetrofit.create(GankApi.class);
Observable<GankAndroid> mAndroidObservable = mGankApi.getData("10/1");
mAndroidObservable.subscribe(gankAndroid -> {
doAssert(gankAndroid.getResults().get(0));
}, throwable -> System.out.println("fail"));
}
@Test
public void test() throws Exception {
RxJavaAssemblyTracking.enable();
System.out.println("start");
PublishSubject<String> stringsEmitter = PublishSubject.create();
Observable<String> combineSource = stringsEmitter
.doOnSubscribe(d -> System.out.println("Connected!"))
.replay(1)
.refCount()
.doOnSubscribe(d -> System.out.println("Subscribed!"))
.doOnDispose(() -> System.out.println("Disposed!"));
CompositeDisposable c = new CompositeDisposable();
c.add(
combineSource
//.subscribeOn(Schedulers.io())
.subscribe((string) -> System.out.println("A1:" + string))
);
c.add(
combineSource
//.subscribeOn(Schedulers.io())
.subscribe((string) -> System.out.println("B1:" + string))
);
stringsEmitter.onNext("s1");
Thread.sleep(100);
c.clear();
Thread.sleep(100);
System.out.println("---");
combineSource
//.subscribeOn(Schedulers.io())
//.subscribe((string) -> System.out.println("A2:" + string));
.subscribe((string) -> System.out.println("A2:" + string), Throwable::printStackTrace, () -> System.out.println("Done A2"));
combineSource
//.subscribeOn(Schedulers.io())
.subscribe((string) -> System.out.println("B2:" + string), Throwable::printStackTrace, () -> System.out.println("Done B2"));
Thread.sleep(1000);
stringsEmitter.onNext("s2");
Thread.sleep(1000);
stringsEmitter.onNext("s3");
stringsEmitter.onNext("s4");
}
private void doSomeWork() {
Car car = new Car();
Observable<String> brandDeferObservable = car.brandDeferObservable();
car.setBrand("BMW"); // Even if we are setting the brand after creating Observable
// we will get the brand as BMW.
// If we had not used defer, we would have got null as the brand.
brandDeferObservable
.subscribe(getObserver());
}