下面列出了io.reactivex.subscribers.ResourceSubscriber#dispose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) {
// TODO Auto-generated method stub
ResourceSubscriber<Long> resourceSubscriber = new ResourceSubscriber<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("Its Done!!!");
dispose();
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
dispose();
}
@Override
public void onNext(Long value_long) {
// TODO Auto-generated method stub
if(value_long==7)
dispose();
System.out.println("value :-"+value_long);
}
@Override
protected void onStart() {
// TODO Auto-generated method stub
request(Long.MAX_VALUE);
}
};
Flowable.rangeLong(5, 4).subscribe(resourceSubscriber);
resourceSubscriber.dispose();
}
public static void main(String[] args) {
Observable<Beer> beerData = BeerServer.getData(); // No streaming just yet
ResourceSubscriber<Beer> beerSubscriber = new ResourceSubscriber<Beer>() {
@Override
public void onNext(Beer beer) {
System.out.println("Got "+ beer);
}
@Override
public void onError(Throwable throwable) {
System.err.println("In Observer.onError(): " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("*** The stream is over ***");
}
};
// Converting an Observable to Flowable
beerData
.toFlowable(BackpressureStrategy.BUFFER)
.subscribe(beerSubscriber); // Streaming starts here
// If the subscriber is less than 21 year old, cancel subscription
beerSubscriber.dispose();
}