类io.reactivex.subscribers.DefaultSubscriber源码实例Demo

下面列出了怎么用io.reactivex.subscribers.DefaultSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。

@OnClick(R.id.btn_demo_timing_1)
public void btn1_RunSingleTaskAfter2s() {
  _log(String.format("A1 [%s] --- BTN click", _getCurrentTimestamp()));

  Flowable.timer(2, TimeUnit.SECONDS) //
      .subscribe(
          new DefaultSubscriber<Long>() {
            @Override
            public void onNext(Long number) {
              _log(String.format("A1 [%s]     NEXT", _getCurrentTimestamp()));
            }

            @Override
            public void onError(Throwable e) {
              Timber.e(e, "something went wrong in TimingDemoFragment example");
            }

            @Override
            public void onComplete() {
              _log(String.format("A1 [%s] XXX COMPLETE", _getCurrentTimestamp()));
            }
          });
}
 
@OnClick(R.id.btn_demo_timing_4)
public void btn4_RunTask5Times_IntervalOf3s() {
  _log(String.format("D4 [%s] --- BTN click", _getCurrentTimestamp()));

  Flowable.interval(3, TimeUnit.SECONDS)
      .take(5)
      .subscribe(
          new DefaultSubscriber<Long>() {
            @Override
            public void onNext(Long number) {
              _log(String.format("D4 [%s]     NEXT", _getCurrentTimestamp()));
            }

            @Override
            public void onError(Throwable e) {
              Timber.e(e, "something went wrong in TimingDemoFragment example");
            }

            @Override
            public void onComplete() {
              _log(String.format("D4 [%s] XXX COMPLETE", _getCurrentTimestamp()));
            }
          });
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Flowable< Long> flowable=Flowable.rangeLong(2, 12);
	flowable.subscribe(new DefaultSubscriber<Long>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("Its Done!!!");

		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();

		}

		@Override
		public void onNext(Long value) {
			// TODO Auto-generated method stub
			if(value==4)
				cancel();
			System.out.println("value:-" + value);

		}

		@Override
		protected void onStart() {
			// TODO Auto-generated method stub
			request(5);
		}

	});

}
 
源代码4 项目: C9MJ   文件: LivePlayPresenterImpl.java
@Override
    public void getDanmuDetail(String roomid, String live_type) {
        if (live_type.equals("panda")) {
            RetrofitHelper.getPandaHelper().create(LiveAPI.class)
                    .getPandaChatroom(roomid)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new DefaultSubscriber<LivePandaBean>() {
                        @Override
                        public void onComplete() {

                        }

                        @Override
                        public void onError(Throwable e) {
//                            view.showError("弹幕服务器接口已过期,请刷新直播列表!");
                            view.showError(e.getMessage());
                        }

                        @Override
                        public void onNext(LivePandaBean detailPandaBean) {
                            if (detailPandaBean.getErrno() == 0) {
                                view.updateChatDetail(detailPandaBean);
                            } else {
                                view.showError(detailPandaBean.getErrmsg());
                            }
                        }
                    });
        } else {
            view.showError("直播平台:" + live_type + "!不是熊猫TV的弹幕池!");
        }
    }
 
@Test
public void streaming() {
    final WebClient client = WebClient.of(rule.httpUri() + "/streaming");
    final AtomicBoolean isFinished = new AtomicBoolean();
    client.get("/json").subscribe(new DefaultSubscriber<HttpObject>() {
        final ImmutableList.Builder<HttpObject> received = new Builder<>();

        @Override
        public void onNext(HttpObject httpObject) {
            received.add(httpObject);
        }

        @Override
        public void onError(Throwable t) {
            throw new Error("Should not reach here.");
        }

        @Override
        public void onComplete() {
            final Iterator<HttpObject> it = received.build().iterator();
            final ResponseHeaders headers = (ResponseHeaders) it.next();
            assertThat(headers.status()).isEqualTo(HttpStatus.OK);
            assertThat(headers.contentType()).isEqualTo(MediaType.JSON_SEQ);
            // JSON Text Sequences: *(Record Separator[0x1E] JSON-text Line Feed[0x0A])
            assertThat(((HttpData) it.next()).array())
                    .isEqualTo(new byte[] { 0x1E, '\"', 'a', '\"', 0x0A });
            assertThat(((HttpData) it.next()).array())
                    .isEqualTo(new byte[] { 0x1E, '\"', 'b', '\"', 0x0A });
            assertThat(((HttpData) it.next()).array())
                    .isEqualTo(new byte[] { 0x1E, '\"', 'c', '\"', 0x0A });
            assertThat(((HttpData) it.next()).isEmpty()).isTrue();
            assertThat(it.hasNext()).isFalse();
            isFinished.set(true);
        }
    });
    await().until(isFinished::get);
}
 
@OnClick(R.id.btn_demo_timing_5)
public void btn5_RunTask5Times_IntervalOf3s() {
  _log(String.format("D5 [%s] --- BTN click", _getCurrentTimestamp()));

  Flowable.just("Do task A right away")
      .doOnNext(input -> _log(String.format("D5 %s [%s]", input, _getCurrentTimestamp())))
      .delay(1, TimeUnit.SECONDS)
      .doOnNext(
          oldInput ->
              _log(
                  String.format(
                      "D5 %s [%s]", "Doing Task B after a delay", _getCurrentTimestamp())))
      .subscribe(
          new DefaultSubscriber<String>() {
            @Override
            public void onComplete() {
              _log(String.format("D5 [%s] XXX COMPLETE", _getCurrentTimestamp()));
            }

            @Override
            public void onError(Throwable e) {
              Timber.e(e, "something went wrong in TimingDemoFragment example");
            }

            @Override
            public void onNext(String number) {
              _log(String.format("D5 [%s]     NEXT", _getCurrentTimestamp()));
            }
          });
}
 
 类所在包
 类方法
 同包方法