下面列出了怎么用io.reactivex.BackpressureStrategy的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testThatSubscriberCanCancelBeforeEmission() {
UniAssertSubscriber<Integer> ts = UniAssertSubscriber.create();
Uni<Integer> uni = Uni.createFrom().publisher(Flowable.<Integer> create(emitter -> new Thread(() -> {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
emitter.onNext(1);
}).start(), BackpressureStrategy.DROP)).map(i -> i + 1);
uni.subscribe().withSubscriber(ts);
ts.cancel();
ts.assertNotCompleted();
}
public SimpleNetBoundResource() {
flowable = Flowable.create(emitter -> {
// 加载中
emitter.onNext(Resource.loading(null));
// 从网络加载数据
fetchFromNet().subscribe(
response -> { // 成功
saveResult(response);
emitter.onNext(Resource.success(response));
},
e -> { // 失败
RxHelper.handleError(e).subscribe(error -> {
emitter.onNext(Resource.<T>error(error.getCode(), error.getMessage(), null));
});
});
}, BackpressureStrategy.BUFFER);
}
private void handleCampaign() {
compositeDisposable.add(ReactiveNetwork.observeInternetConnectivity()
.subscribeOn(Schedulers.io())
.filter(hasInternet -> hasInternet)
.filter(__ -> this.campaignId == null)
.firstOrError()
.flatMap(__ -> campaignService.getCampaign())
.retryWhen(throwableObservable -> throwableObservable.toObservable()
.flatMap(throwable -> {
throwable.printStackTrace();
return ReactiveNetwork.observeInternetConnectivity();
})
.flatMap(this::retryIfNetworkAvailable)
.toFlowable(BackpressureStrategy.LATEST))
.doOnSuccess(this::processCampaign)
.subscribe());
}
@Override
public Flowable<ZeroFiveNewsDetail> getAcgNewsDetail(final String url) {
return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsDetail>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsDetail> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ZeroFiveNewsDetail zeroFiveNewsDetail = JP.from(html, ZeroFiveNewsDetail.class);
e.onNext(zeroFiveNewsDetail);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ZeroFiveNewsPage> getAcgNews(final String typeUrl) {
return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsPage>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsPage> e) throws Exception {
Element html = Jsoup.connect(typeUrl).get();
if(html == null){
e.onError(new Throwable("element html is null"));
}else {
ZeroFiveNewsPage zeroFiveNewsPage = JP.from(html, ZeroFiveNewsPage.class);
e.onNext(zeroFiveNewsPage);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
/**
* Create a mapper from retrofit service to {@link Resource} with rx's {@link Flowable}
* To indicate current state while execute an rest api (loading, error, success with status and message if error)
* @param remote from retrofit service
* @param onSave will be called after success response come, to save response data into local database
* @param <T> type of response
* @return a {@link Flowable} instance to deal with progress showing and error handling
*/
public static <T> Flowable<Resource<T>> createRemoteSourceMapper(@Nullable Single<T> remote,
@Nullable PlainConsumer<T> onSave) {
return Flowable.create(emitter -> {
new SimpleRemoteSourceMapper<T>(emitter) {
@Override
public Single<T> getRemote() {
return remote;
}
@Override
public void saveCallResult(T data) {
if (onSave != null) {
onSave.accept(data);
}
}
};
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<MediaEntity.MediaResponseData> getMediaList(int imageOffset, int videoOffset, String bucketId) {
if (TextUtils.isEmpty(bucketId)) {
return Flowable.empty();
}
if (TextUtils.equals(bucketId, String.valueOf(Integer.MAX_VALUE))) {
// 图片和视频
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleImageAndVideoMediaList(imageOffset, videoOffset));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
} else if (TextUtils.equals(bucketId, String.valueOf(Integer.MIN_VALUE))) {
// 所有视频
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleVideoMediaList(imageOffset, videoOffset));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
} else {
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleImageMediaList(imageOffset, videoOffset, bucketId));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
}
}
@Override
public Flowable<ScheduleOtherPage> getScheduleOtherPage(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleOtherPage>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleOtherPage> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ScheduleOtherPage scheduleOtherPage = JP.from(html, ScheduleOtherPage.class);
e.onNext(scheduleOtherPage);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Flowable<String> month_maybe = Flowable.create(emitter -> {
try {
String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct", "Nov",
"Dec" };
List<String> months = Arrays.asList(monthArray);
for (String month : months) {
emitter.onNext(month);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
},BackpressureStrategy.MISSING);
month_maybe.subscribe(s -> System.out.println(s));
}
private void startListenBalance(final PublishSubject<Object> subject){
Disposable subscribe =
api
.updateBalance()
.repeatWhen((completed) -> completed.delay(
BuildConfig.UPDATE_BALANCE_SECONDS_DELAY,
TimeUnit.SECONDS
))
.retryWhen((completed) -> completed.delay(
BuildConfig.UPDATE_BALANCE_SECONDS_DELAY,
TimeUnit.SECONDS
))
.repeatWhen(repeatHandler ->
repeatHandler.flatMap(nothing -> subject.toFlowable(BackpressureStrategy.LATEST)))
.subscribe();
compositeDisposable.add(subscribe);
}
@Test
public void testUpdatesAllChecked() throws Exception {
RxPaperBook book = RxPaperBook.with("UPDATES_ALL_CH", Schedulers.trampoline());
final String key = "hello";
final ComplexObject value = ComplexObject.random();
final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
book.observeAll(ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
updatesSubscriber.assertValueCount(0);
book.write(key, value).subscribe();
updatesSubscriber.assertValueCount(1);
updatesSubscriber.assertValues(value);
final ComplexObject newValue = ComplexObject.random();
book.write(key, newValue).subscribe();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
// Error value
final int wrongValue = 3;
book.write(key, wrongValue).test().assertComplete().assertNoErrors();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
updatesSubscriber.assertNoErrors();
}
/**
* use only request with an empty array to request all manifest permissions
*/
public Flowable<PermissionResult> requestAsFlowable(final List<String> permissions) {
return Flowable.create(new FlowableOnSubscribe<PermissionResult>() {
@Override
public void subscribe(final FlowableEmitter<PermissionResult> emitter) throws Exception {
runtimePermission
.request(permissions)
.onResponse(new ResponseCallback() {
@Override
public void onResponse(PermissionResult result) {
if (result.isAccepted()) {
emitter.onNext(result);
} else {
emitter.onError(new Error(result));
}
}
}).ask();
}
}, BackpressureStrategy.LATEST);
}
public Publisher<Score> talkScores(final String title) {
Observable<Score> observable = Observable.create(e -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
Score s = Score.builder()
.title(title)
.score(Integer.valueOf((int) Math.floor(Math.random()*10)))
.build();
e.onNext(s);
}, 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable connectableObservable = observable.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
private RxReporter(Builder<M, R> builder) {
this.sender = builder.sender;
this.metrics = builder.metrics;
this.messageTimeoutNanos = builder.messageTimeoutNanos;
this.bufferedMaxMessages = builder.bufferedMaxMessages;
this.pendingMaxMessages = builder.pendingMaxMessages;
this.overflowStrategy = builder.overflowStrategy;
this.scheduler = builder.scheduler;
Flowable<MessagePromise<R>> flowable = Flowable.create(this, BackpressureStrategy.MISSING);
initBackpressurePolicy(flowable)
.observeOn(Schedulers.single())
.groupBy(new MessagePartitioner())
.subscribe(new MessageGroupSubscriber(messageTimeoutNanos, bufferedMaxMessages, sender, scheduler));
}
private Flowable<String> flowableInputStreamScanner(InputStream inputStream) {
return Flowable.create(subscriber -> {
try (Scanner scanner = new Scanner(inputStream, "UTF-8")) {
while (scanner.hasNext()) {
subscriber.onNext(scanner.nextLine());
}
}
subscriber.onComplete();
}, BackpressureStrategy.BUFFER);
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
unbinder = ButterKnife.bind(this);
firstNameChangeObservable = RxTextView.textChanges(etFirstName).skip(1).toFlowable(BackpressureStrategy.LATEST);
lastNameChangeObservable = RxTextView.textChanges(etLastName).skip(1).toFlowable(BackpressureStrategy.LATEST);
emailChangeObservable = RxTextView.textChanges(etEmail).skip(1).toFlowable(BackpressureStrategy.LATEST);
combineLatestEvents();
}
public Flowable<ChangeSet> observeChangesOnCalendar() {
return Flowable.create((FlowableEmitter<ChangeSet> emitter) -> {
CalendarObjectChangeListener objectChangeListener = emitter::onNext;
addChangeListener(objectChangeListener);
emitter.setCancellable(() -> removeChangeListener(objectChangeListener));
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<String> getReposInWeek(String user) {
return Flowable.create(emitter -> {
gitHbubRepos.getRepos(user).stream()
.filter(repo -> repo.getPushed().isAfter(LocalDateTime.now().minusWeeks(1)))
.map(item -> item.getName() + " ")
.forEach(emitter::onNext);
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<Commit> getCommitsInWeek(String user, String repo) {
return Flowable.create(emitter -> {
gitHbubRepos.getCommitsInWeek(user, repo)
.forEach(emitter::onNext);
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
}
/**
* 加载数据,刷新视图
**/
private void loadData() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
reset();
/* io线程加载数据库数据 */
List<AlarmClock> alarmClocks = mAssistDao.findAllAlarmAsc(false);
week = Calendar.getInstance().get(Calendar.DAY_OF_WEEK);
week = week - 1 == 0 ? 7 : week - 1; //计算当天是星期几
week = week + 1 == 8 ? 1 : week + 1; // 计算明天是星期几
for (AlarmClock alarm : alarmClocks) {
TaskCard<AlarmClock> card = new TaskCard<>(alarm, TaskCard.TaskState.ACTIVE);
if (alarm.getValid() == 0)
card.taskState = TaskCard.TaskState.INVALID;
updateAlarmCount(AccountingActivity.TYPE_ADD, alarm);
alarmDatas.add(card);
}
e.onNext(0);
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
mCpbLoad.setVisibility(View.VISIBLE);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
/* 回到主线程刷新列表 */
mCpbLoad.setVisibility(View.GONE);
mAdapter.notifyDataSetChanged();
}
});
}
private void demo4() {
PublishSubject<Integer> observable = PublishSubject.create();
observable.toFlowable(BackpressureStrategy.MISSING)
.buffer(10)
.observeOn(Schedulers.computation())
.subscribe(v -> log("s", v.toString()), this::log);
for (int i = 0; i < 1000000; i++) {
observable.onNext(i);
}
}
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
source -> (io.reactivex.Flowable<?>) source,
Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
source -> ((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements()
);
}
@Nonnull
@Override
public Flowable<String> queryRaw(@Nonnull final Publisher<String> queryStream,
@Nullable final Dialect dialect,
@Nonnull final String org) {
Arguments.checkNotNull(queryStream, "queryStream");
Arguments.checkNonEmpty(org, "org");
return Flowable
.fromPublisher(queryStream)
.map(it -> service.postQueryResponseBody(null, "application/json",
null, org, null, new Query().query(it).dialect(dialect)))
.flatMap(queryCall -> {
Observable<String> observable = Observable.create(subscriber -> {
BiConsumer<Cancellable, String> consumer = (cancellable, line) -> {
if (subscriber.isDisposed()) {
cancellable.cancel();
} else {
subscriber.onNext(line);
}
};
queryRaw(queryCall, consumer, subscriber::onError, subscriber::onComplete, false);
});
return observable.toFlowable(BackpressureStrategy.BUFFER);
});
}
/**
* Send chat message to the server.
*
* @param chatMessage
* @return
*/
@Override
public Flowable<ChatMessage> sendMessage(@NonNull final ChatMessage chatMessage) {
return Flowable.create(new FlowableOnSubscribe<ChatMessage>() {
@Override
public void subscribe(FlowableEmitter<ChatMessage> emitter) throws Exception {
/*
* Socket.io supports acking messages.
* This feature can be used as
* mSocket.emit("EVENT_NEW_MESSAGE", chatMessage.getMessage(), new Ack() {
* @Override
* public void call(Object... args) {
* // Do something with args
*
* // On success
* emitter.onNext(chatMessage);
*
* // On error
* emitter.onError(new Exception("Sending message failed."));
* }
* });
*
* */
mSocket.emit(EVENT_NEW_MESSAGE, chatMessage.getMessage());
emitter.onNext(chatMessage);
}
}, BackpressureStrategy.BUFFER);
}
@Test
public void demoThrottleLastVsLatest() throws Exception
{
final PV pv = PVPool.getPV("sim://ramp(1, 1000, 1)");
System.out.println("Throttle a 1Hz value as 'Last' vs. 'Latest' every 3 seconds");
System.out.println("'Latest' receives the first value, while 'Last' has initial latency.");
System.out.println("From then on they throttle in similar way");
final Disposable last = pv
.onValueEvent(BackpressureStrategy.BUFFER)
.throttleLast(3, TimeUnit.SECONDS)
.subscribe(value ->
{
System.out.println("Last : " + value + " @ " + Instant.now());
});
final Disposable latest = pv
.onValueEvent(BackpressureStrategy.BUFFER)
.throttleLatest(3, TimeUnit.SECONDS)
.subscribe(value ->
{
System.out.println("Latest: " + value + " @ " + Instant.now());
});
Thread.sleep(11000);
latest.dispose();
last.dispose();
PVPool.releasePV(pv);
}
@Override
public Flowable<List<BucketEntity>> getBucketList() {
return Flowable.create((FlowableOnSubscribe<List<BucketEntity>>) e -> {
e.onNext(handleBucketData());
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
}
@Override
public Flowable<List<Task>> getTasks() {
String[] projection = {
TaskEntry.COLUMN_NAME_ENTRY_ID,
TaskEntry.COLUMN_NAME_TITLE,
TaskEntry.COLUMN_NAME_DESCRIPTION,
TaskEntry.COLUMN_NAME_COMPLETED
};
String sql =
String.format("SELECT %s FROM %s", TextUtils.join(",", projection), TaskEntry.TABLE_NAME);
return mDatabaseHelper
.createQuery(TaskEntry.TABLE_NAME, sql)
.mapToList(mTaskMapperFunction)
.toFlowable(BackpressureStrategy.BUFFER);
}
/**
* Naive update subscription for saved objects.
* <p/>
* This method will return all objects casted unsafely, and throw
* {@link ClassCastException} if types do not match. For a safely checked and filtered version
* use {@link this#observeAll(Class, BackpressureStrategy)}.
*
* @param backPressureStrategy how the backpressure is handled downstream
* @return hot observable
*/
@SuppressWarnings("unchecked")
public <T> Flowable<T> observeAllUnsafe(BackpressureStrategy backPressureStrategy) {
return updates.toFlowable(backPressureStrategy)
.map(new Function<Pair<String, ?>, T>() {
@Override
public T apply(Pair<String, ?> stringPair) {
return (T) stringPair.second;
}
});
}
private S2Client(Builder builder) {
connectToIp = builder.connectToIp;
connectToPort = builder.connectToPort;
requestTimeoutInMillis = builder.requestTimeoutInMillis;
connectTimeoutInMillis = builder.connectTimeoutInMillis;
traced = builder.traced;
tracer = builder.tracer;
game = builder.game;
log.info("Starting: {}", this);
Channel channel = channelProvider.getChannel();
channel.onConnectionLost(builder.onConnectionLost);
responseStream = channel.outputStream().mergeWith(channel.errorStream())
.map(this::prepareResponse)
.toFlowable(BackpressureStrategy.ERROR)
.onBackpressureBuffer(cfg().getInt(OcraftApiConfig.CLIENT_BUFFER_SIZE_RESPONSE_BACKPRESSURE))
.observeOn(Schedulers.computation(), false, cfg().getInt(CLIENT_BUFFER_SIZE_RESPONSE_STREAM))
.publish()
.autoConnect()
.doOnSubscribe(s -> await.register())
.doOnCancel(await::arriveAndDeregister);
responseStream().subscribe(this);
await.arriveAndDeregister();
Optional.ofNullable(game).ifPresent(s2Controller -> {
responseStream().subscribe(s2Controller);
await.arriveAndDeregister();
});
channelProvider.start(connectToIp, connectToPort, connectTimeoutInMillis);
}
/**
* 生成Flowable
*/
public static <T> Flowable<T> createData(final T t) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
emitter.onNext(t);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}