下面列出了怎么用io.reactivex.functions.Consumer的API类实例代码及写法,或者点击链接到github查看源代码。
private void setProgress(final Message msg, final ProgressView progressView) {
if (mDisposable2 == null || mDisposable2.isDisposed()) {
mDisposable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
long curPosition = State.transfers.getProgress(msg.getId());
LogUtil.i(TAG, "progress interval size:"
+ msg.getSize()
+ ",curPosition:"
+ curPosition);
progressView.setProgress(msg.getSize(),
State.transfers.getProgress(msg.getId()));
if (msg.getSize() <= curPosition) {
mDisposable2.dispose();
mDisposable2 = null;
}
}
});
}
}
@Override
public void attachView(PrivateArticleContract.PrivateArticleView view) {
super.attachView(view);
addEventSubscribe(RxBus.getDefault().toFlowable(ShareArticleEvent.class).subscribe(new Consumer<ShareArticleEvent>() {
@Override
public void accept(ShareArticleEvent shareArticleEvent) throws Exception {
if(shareArticleEvent.ismIsShareSuccess()){
//分享成功 刷新页面数据
getPrivateArticleDataList(1,false);
mView.showErrorMsg(shareArticleEvent.getmMsg());
}else{
mView.showErrorMsg(shareArticleEvent.getmMsg());
}
}
}));
}
public void doIt(
Completable flowable,
Action complete,
Consumer<? super Throwable> error
) {
long current = disposableAddress.size();
Disposable subscription = flowable.
timeout(DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.doOnError(throwable -> {
remove(current);
})
.doOnComplete(() -> {
remove(current);
})
.subscribe(complete, error);
disposableAddress.append(current, subscription);
}
@Test
@SmallTest
@UiThreadTest
public void testOneCellWithMultiViewExposure() {
Consumer<ClickExposureCellOp> consumer1 = new Consumer<ClickExposureCellOp>() {
@Override
public void accept(ClickExposureCellOp clickEvent) throws Exception {
assertTrue(Looper.myLooper() == Looper.getMainLooper());
assertTrue(clickEvent.getArg1() == mView1 || clickEvent.getArg1() == mView2);
assertTrue(clickEvent.getArg2() == mBaseCell1);
Log.d("RxExposureSupportTest", "testOneCellWithMultiViewExposure mEventType " + clickEvent.getArg3());
Log.d("RxExposureSupportTest", "testOneCellWithMultiViewExposure view " + clickEvent.getArg1());
}
};
//mExposureSupport.setConsumer(consumer1);
mBaseCell1.exposure(mView1);
mBaseCell1.exposure(mView2);
}
@Test
public void testConvertToSingleFinished() throws Exception {
T completable = originalFutureTestHelper.createFinishedFuture();
Single<String> single = toSingle(completable);
Consumer<String> onSuccess = mockAction();
Consumer<Throwable> onError = mockAction();
single.subscribe(v -> {
onSuccess.accept(v);
latch.countDown();
},
onError);
latch.await();
verify(onSuccess).accept(VALUE);
verifyZeroInteractions(onError);
assertSame(completable, toFuture(single));
}
private void createSpeedObserver() {
if (mDisposableSpeed != null && !mDisposableSpeed.isDisposed()) {
return;
}
mDisposableSpeed = Observable.interval(1, 1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.computation())
.map(new Function<Long, Float>() {
private long lastDownloadLength = 0;
@Override
public Float apply(Long ms) throws Exception {
float bytesPerSecond = UnitFormatUtils.calculateSpeed(mInfo.downloadLength - lastDownloadLength, 1);
lastDownloadLength = mInfo.downloadLength;
return bytesPerSecond;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Float>() {
@Override
public void accept(Float speedPerSecond) throws Exception {
if (mSpeedListener != null) {
mSpeedListener.onSpeedChange(speedPerSecond, UnitFormatUtils.formatSpeedPerSecond(speedPerSecond));
}
}
});
}
void verifyRawQueryBehaviorForCursor(@NonNull Flowable<Cursor> flowable) {
new FlowableBehaviorChecker<Cursor>()
.flowable(flowable)
.expectedNumberOfEmissions(1)
.testAction(new Consumer<Cursor>() {
@Override
public void accept(Cursor cursor) {
// Get Operation should be subscribed to changes of tables from Query
verify(storIOSQLite).observeChanges(LATEST);
verify(storIOSQLite).defaultRxScheduler();
verifyRawQueryBehaviorForCursor(cursor);
}
})
.checkBehaviorOfFlowable();
assertThat(rawQuery.observesTables()).isNotNull();
}
public MutableLiveData<BookBean> getBook() {
final MutableLiveData<BookBean> data = new MutableLiveData<>();
Disposable subscribe = HttpClient.Builder.getDouBanService().getBook(bookType.get(), mStart, mCount)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<BookBean>() {
@Override
public void accept(BookBean bookBean) throws Exception {
data.setValue(bookBean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
data.setValue(null);
}
});
addDisposable(subscribe);
return data;
}
/**
* listener调用方式,在主线程订阅并将返回结果通过 listener 通知调用方
*
* @param listener 接收回调结果
*/
public void launch(final OnCompressListener listener) {
asObservable().observeOn(AndroidSchedulers.mainThread()).doOnSubscribe(
new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
listener.onStart();
}
})
.subscribe(new Consumer<File>() {
@Override
public void accept(File file) throws Exception {
listener.onSuccess(file);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
listener.onError(throwable);
}
});
}
/**
* 保存图片到本地
*
* @param fileName 文件名
*/
private void saveImageToLocal(final String fileName) {
Observable.create(new ObservableOnSubscribe<File>() {
@Override
public void subscribe(ObservableEmitter<File> e) throws Exception {
e.onNext(Glide.with(ImageBrowseActivity.this)
.load(mImageUrl)
.downloadOnly(Target.SIZE_ORIGINAL, Target.SIZE_ORIGINAL)
.get());
e.onComplete();
}
}).compose(RxHelper.<File>rxSchedulerHelper()).subscribe(new Consumer<File>() {
@Override
public void accept(File file) throws Exception {
saveImage(fileName, file);
}
});
}
private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
Function<? super ResultSet, ? extends T> f) throws SQLException {
ResultSet rsActual = stmt.stmt.getResultSet();
Callable<ResultSet> initialState = () -> rsActual;
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
if (rs.next()) {
T v = f.apply(rs);
log.debug("emitting {}", v);
emitter.onNext(v);
} else {
log.debug("completed");
emitter.onComplete();
}
};
Consumer<ResultSet> disposeState = Util::closeSilently;
return Flowable.generate(initialState, generator, disposeState);
}
@Test
public void resultThrowingInOnSuccessDeliveredToPlugin() {
server.enqueue(new MockResponse());
final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
if (!throwableRef.compareAndSet(null, throwable)) {
throw Exceptions.propagate(throwable);
}
}
});
RecordingMaybeObserver<Result<String>> observer = subscriberRule.create();
final RuntimeException e = new RuntimeException();
service.result().subscribe(new ForwardingObserver<Result<String>>(observer) {
@Override
public void onSuccess(Result<String> value) {
throw e;
}
});
assertThat(throwableRef.get()).isSameAs(e);
}
@Override
public void loadDailyDetail(String id) {
if (mIModel == null)
return;
mRxManager.register(mIModel.getDailyDetail(id).subscribe(new Consumer<ZhihuDailyDetailBean>() {
@Override
public void accept(ZhihuDailyDetailBean zhihuDailyDetailBean) throws Exception {
if (mIView != null)
mIView.showDailyDetail(zhihuDailyDetailBean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
if (mIView != null) {
mIView.showToast("网络异常");
mIView.showNetworkError();
}
}
}));
}
@Override
public Completable writeDescriptor(@NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid,
@NonNull final UUID descriptorUuid, @NonNull final byte[] data) {
return discoverServices()
.flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
@Override
public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
}
})
.doOnSuccess(new Consumer<BluetoothGattDescriptor>() {
@Override
public void accept(BluetoothGattDescriptor bluetoothGattDescriptor) throws Exception {
bluetoothGattDescriptor.setValue(data);
}
})
.toCompletable();
}
/**
* 合成并显示回复文本
**/
private void synthesizeAndShowResp(final List<Track> tracks, String content, final int finalPlayIndex) {
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(content), null, null, null));
SynthesizerBase.get().startSpeakAbsolute(content)
.doOnNext(new Consumer<SpeechMsg>() {
@Override
public void accept(SpeechMsg speechMsg) throws Exception {
if (speechMsg.state() == SpeechMsg.State.OnBegin)
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
if (tracks != null)
XmlyManager.get().getPlayer().playList(tracks, finalPlayIndex);
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
@Test public void readThrowsOnSubscribeWhenCannotStoreSecurely() {
shadowContext.grantPermissions(USE_FINGERPRINT);
when(fingerprintManager.isHardwareDetected()).thenReturn(true);
when(fingerprintManager.hasEnrolledFingerprints()).thenReturn(true);
Observable<ReadResult> read = whorlwind.read("a");
shadowContext.denyPermissions(USE_FINGERPRINT);
try {
read.blockingForEach(new Consumer<ReadResult>() {
@Override public void accept(ReadResult readResult) throws Exception {
fail();
}
});
fail();
} catch (IllegalStateException expected) {
assertThat(expected).hasMessage(
"Can't store securely. Check canStoreSecurely() before attempting to read/write.");
}
verifyZeroInteractions(storage);
}
/**
* 更新余额、当日收支
**/
public void updateBalance(final int type, final List<TaskCard<Accounting>> taskcards) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
CountToday();
CountBalance(type, taskcards);
e.onNext(0);
}
})
.subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程
.observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
if (AppConfig.dPreferences.getBoolean(AppConfig.HAS_AMOUNT, false))
mTvBalance.setText("¥" + AssistUtils.formatAmount(balance));
mAdapter.notifyItemChanged(0);
}
});
}
/**
* 用RxJava添加订阅者
*
* @param subscriberMethod d
*/
@SuppressWarnings("unchecked")
private void addSubscriber(final SubscriberMethod subscriberMethod) {
Flowable flowable;
if (subscriberMethod.code == -1) {
flowable = toObservable(subscriberMethod.eventType);
} else {
flowable = toObservable(subscriberMethod.code, subscriberMethod.eventType);
}
Disposable subscription = postToObservable(flowable, subscriberMethod)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
callEvent(subscriberMethod, o);
}
});
addSubscriptionToMap(subscriberMethod.subscriber.getClass(), subscription);
}
@Test
public void testSelfObservableDelete() throws Exception {
final AtomicInteger count = new AtomicInteger();
Disposable disposable = data.select(Person.class).get().observableResult().subscribe(
new Consumer<Result<Person>>() {
@Override
public void accept(Result<Person> persons) {
count.incrementAndGet();
}
});
Person person = randomPerson();
data.insert(person).blockingGet();
data.delete(person).blockingGet();
assertEquals(3, count.get());
disposable.dispose();
}
@Test(expected = IllegalStateException.class)
public void assertThatFlowableEmitsOnceNegative() {
final Flowable<Integer> testFlowable = Flowable.just(1, 2);
new FlowableBehaviorChecker<Integer>()
.flowable(testFlowable)
.expectedNumberOfEmissions(1)
.testAction(new Consumer<Integer>() {
final AtomicInteger numberOfInvocations = new AtomicInteger(0);
@Override
public void accept(@NonNull Integer integer) throws Exception {
if (numberOfInvocations.incrementAndGet() > 1) {
fail("Should be called once");
}
}
})
.checkBehaviorOfFlowable();
}
@Override
protected void onStart() {
super.onStart();
mActivityLFEventLifeCycleProvider.emitNext(ActivityLFEvent.START);
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onStart()");
}
})
.compose(mActivityLFEventLifeCycleProvider.<Long>bindUntil(ActivityLFEvent.DESTROY))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onStart(), running until in onDestroy(): " + num);
}
});
}
/**
* Returns a Flowable stream of byte arrays from the given
* {@link InputStream} between 1 and {@code bufferSize} bytes.
*
* @param is
* input stream of bytes
* @param bufferSize
* max emitted byte array size
* @return a stream of byte arrays
*/
public static Flowable<byte[]> from(final InputStream is, final int bufferSize) {
return Flowable.generate(new Consumer<Emitter<byte[]>>() {
@Override
public void accept(Emitter<byte[]> emitter) throws Exception {
byte[] buffer = new byte[bufferSize];
int count = is.read(buffer);
if (count == -1) {
emitter.onComplete();
} else if (count < bufferSize) {
emitter.onNext(Arrays.copyOf(buffer, count));
} else {
emitter.onNext(buffer);
}
}
});
}
/**
* 轮播图
*/
public void showBannerPage(final RequestImpl listener) {
Disposable subscribe = HttpClient.Builder.getTingServer().getFrontpage()
.observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io())
.subscribe(new Consumer<FrontpageBean>() {
@Override
public void accept(FrontpageBean frontpageBean) throws Exception {
if (listener != null) {
listener.loadSuccess(frontpageBean);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
if (listener != null) {
listener.loadFailed();
}
}
});
if (listener != null) {
listener.addSubscription(subscribe);
}
}
public Flowable<Boolean> deleteScheduleCacheByUrl(final String url) {
return RxRealmUtils
.flowableExec(realmConfiguration, new Consumer<Pair<FlowableEmitter, Realm>>() {
@Override
public void accept(final Pair<FlowableEmitter, Realm> pair) throws Exception {
pair.second.executeTransactionAsync(new Transaction() {
@Override
public void execute(Realm r) {
final boolean isSuccess = r.where(ScheduleCache.class)
.equalTo("scheduleUrl", url)
.findAll()
.deleteAllFromRealm();
pair.first.onNext(isSuccess);
pair.first.onComplete();
}
});
}
});
}
/**
* 记录上一次番剧观看位置
*
* @param lastWatchPos 上一次观看位置
*/
public void updateScheduleReadRecord(final int lastWatchPos) {
addSubscribe(
mModel.updateScheduleWatchRecord(curScheduleCache, lastWatchPos)
.compose(RxUtil.<ScheduleCache>rxSchedulerHelper())
.subscribe(new Consumer<ScheduleCache>() {
@Override
public void accept(ScheduleCache scheduleCache) throws Exception {
curScheduleCache.setLastWatchPos(lastWatchPos);
mView.showScheduleCacheStatus(curScheduleCache);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
})
);
}
public void setMyLocationAsPickup() {
if (mState.getOrder() == null
&& mState.currentLocation != null) {
removePickupMarker();
if (mState.pickupPlace == null) {
mState.pickupPlace = new Place();
}
mState.isMyLocationBind = true;
mState.pickupPlace.latitude = mState.currentLocation.getLatitude();
mState.pickupPlace.longitude = mState.currentLocation.getLongitude();
mState.pickupPlace.preview = myLocationText;
disposables.add(MapUtils.getAddress(mContext, mState.pickupPlace.getLatLng()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
mState.pickupPlace.address = s;
mView.updatePickup(mState.pickupPlace);
updateMapCamera();
//TODO debug
updatePickupPlace(mState.pickupPlace);
}
}));
}
}
/**
* Test app run.
*/
@Test
public void testAppRun()
{
final One<Integer> counter=new One<>(0);
final BindAppPreferences sp=BindAppPreferences.getInstance();
sp.edit().putDescription("start").commit();
sp.getDescriptionAsObservable().subscribe(new Consumer<String>() {
@Override
public void accept(String result) throws Exception {
Logger.info("modify "+result);
if (counter.value0==0)
assertTrue(result.equals("start"));
if (counter.value0>=1)
assertTrue(result.equals("end"));
counter.value0=counter.value0+1;
}
});
sp.edit().putDescription("end").putName("name").commit();
//
}
private void initObservable() {
subject = PublishSubject.create();
subject.observeOn(EventThread.getScheduler(thread))
.subscribe(new Consumer() {
@Override
public void accept(Object event) {
try {
if (valid) {
handleEvent(event);
}
} catch (InvocationTargetException e) {
throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberEvent.this, e);
}
}
});
}
@Override
protected void didBecomeActive(@Nullable Bundle savedInstanceState) {
super.didBecomeActive(savedInstanceState);
presenter
.squareClicks()
.subscribe(
new Consumer<BoardCoordinate>() {
@Override
public void accept(BoardCoordinate xy) throws Exception {
if (board.cells[xy.getX()][xy.getY()] == null) {
if (currentPlayer == MarkerType.CROSS) {
board.cells[xy.getX()][xy.getY()] = MarkerType.CROSS;
board.currentRow = xy.getX();
board.currentCol = xy.getY();
presenter.addCross(xy);
currentPlayer = MarkerType.NOUGHT;
} else {
board.cells[xy.getX()][xy.getY()] = MarkerType.NOUGHT;
board.currentRow = xy.getX();
board.currentCol = xy.getY();
presenter.addNought(xy);
currentPlayer = MarkerType.CROSS;
}
}
if (board.hasWon(MarkerType.CROSS)) {
presenter.setPlayerWon(playerOne);
} else if (board.hasWon(MarkerType.NOUGHT)) {
presenter.setPlayerWon(playerTwo);
} else if (board.isDraw()) {
presenter.setPlayerTie();
} else {
updateCurrentPlayer();
}
}
});
updateCurrentPlayer();
}
@Override
public void logout() {
mSettingView.showLoading();
mLogoutDis = Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
boolean result = true;
if (PreferenceUtils.getBoolean(PreferenceUtils.CLEAR_MSG_LOGOUT, false)) {
result = mUserModel.clearChatHistory();
}
if (State.isLoggedIn()) {
State.logout();
}
emitter.onNext(result);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
mSettingView.hideLoading();
ServiceManager.stopToxService();
TokApplication.getInstance().finishOpenedActivities();
PageJumpIn.jumpLoginPage(mSettingView.getActivity());
mSettingView.viewDestroy();
}
});
}