下面列出了怎么用io.reactivex.FlowableSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) {
MathFlowable.averageDouble(Flowable.range(10, 9)).subscribe(new FlowableSubscriber() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("completed successfully");
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onNext(Object value) {
// TODO Auto-generated method stub
System.out.println("average:-" + value);
}
@Override
public void onSubscribe(Subscription subscription) {
// TODO Auto-generated method stub
subscription.request(1);
}
});
}
public static <T> Subscriber<T> wrap(
Subscriber<T> downstream, CurrentTraceContext contextScoper, TraceContext assembled) {
if (downstream instanceof FlowableSubscriber) {
return new TraceContextFlowableSubscriber<>((FlowableSubscriber<T>) downstream,
contextScoper, assembled);
}
return new TraceContextSubscriber<>(downstream, contextScoper, assembled);
}
@OnClick(R.id.go)
public void go() {
Editable url = urlView.getText();
if (TextUtils.isEmpty(url)) {
Toast.makeText(this, "url is empty", Toast.LENGTH_SHORT);
return;
}
RxEasyHttp.get(url.toString(), new RxEasyStringConverter())
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(@NonNull Subscription subscription) throws Exception {
dialog.show();
body.setText("");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
dialog.show();
body.setText("");
}
@Override
public void onNext(String response) {
body.setText(response);
}
@Override
public void onError(Throwable t) {
body.setText(t.toString());
}
@Override
public void onComplete() {
dialog.cancel();
}
});
}
@OnClick(R.id.submit)
public void submit() {
Editable content = comment.getText();
if (TextUtils.isEmpty(content)) {
Toast.makeText(this, "comment is empty", Toast.LENGTH_SHORT);
return;
}
EasyRequestParams params = new EasyRequestParams();
params.put("content", content.toString());
String url = "http://book.km.com/app/index.php?c=version&a=feedback";
RxEasyHttp.post(url, params, new RxEasyCustomConverter<PostEntity>() {
@Override
public void doNothing() {
// 防止范型类型擦除引起范型类型不能正确获取问题.
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<PostEntity>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
dialog.show();
}
@Override
public void onNext(PostEntity entity) {
Toast.makeText(RxPostActivity.this, "提交成功", Toast.LENGTH_LONG).show();
result.setText("status : " + entity.getStatus() + "\n" +
"message : " + entity.getMessage());
}
@Override
public void onError(Throwable t) {
Toast.makeText(RxPostActivity.this, "提交失败", Toast.LENGTH_LONG).show();
result.setText(t.getMessage());
dialog.cancel();
}
@Override
public void onComplete() {
dialog.cancel();
}
});
}
@Override
public void onClick(View v) {
// OkHttpClient okHttpClient = new OkHttpClient();
// Request request = new Request.Builder()
// .url("bd_logo1_31bdc765.png")
// .build();
// Call call= okHttpClient.newCall(request);
// call.enqueue(new Callback()
// {
// @Override
// public void onFailure(Call call, IOException e) {
//
// }
//
// @Override
// public void onResponse(Call call, final Response response) throws IOException {
// mPlatform.execute(new Runnable() {
// @Override
// public void run() {
// InputStream inputStream = response.body().byteStream();
// Bitmap bitmap= BitmapFactory.decodeStream(inputStream);
// image.setImageBitmap(bitmap);
// }
// });
//
// }
//
// });
OkHttpClient okHttpClient = new OkHttpClient();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://ss0.bdstatic.com/5aV1bjqh_Q23odCf/static/superman/img/logo/")
.addConverterFactory(BitmapConverterFactory.create())
.client(okHttpClient)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
UrlService service = retrofit.create(UrlService.class);
// Call<Bitmap> call = service.getImage();
// call.enqueue(new retrofit2.Callback<Bitmap>() {
// @Override
// public void onResponse(Call<Bitmap> call, final retrofit2.Response<Bitmap> response) {
// Bitmap bitmap = Bitmap.createBitmap(response.body(), 0, 0, response.body().getWidth(), response.body().getHeight() / 2);
// image.setImageBitmap(bitmap);
// }
//
// @Override
// public void onFailure(Call<Bitmap> call, Throwable t) {
//
// }
// });
service.getRXImage()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Bitmap>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Bitmap response) {
Bitmap bitmap = Bitmap.createBitmap(response, 0, 0, response.getWidth(), response.getHeight() / 2);
image.setImageBitmap(bitmap);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
TraceContextFlowableSubscriber(
FlowableSubscriber<T> downstream, CurrentTraceContext contextScoper,
TraceContext assembled) {
super(downstream, contextScoper, assembled);
}