下面列出了怎么用io.reactivex.functions.LongConsumer的API类实例代码及写法,或者点击链接到github查看源代码。
public static <V> Flowable<V> takeElements(Supplier<RFuture<V>> callable) {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
AtomicLong counter = new AtomicLong(n);
AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, p, counter, futureRef);
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
futureRef.get().cancel(true);
}
});
}
});
}
public static LongConsumer addLongTo(final List<Long> list) {
return new LongConsumer() {
@Override
public void accept(long t) throws Exception {
list.add(t);
}
};
}
public static LongConsumer printLong(final String prefix) {
return new LongConsumer() {
@Override
public void accept(long t) throws Exception {
System.out.println(prefix + t);
}
};
}
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
final Activity activity, final ShareListener listener) {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception {
try {
emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnRequest(new LongConsumer() {
@Override
public void accept(long aLong) {
listener.shareRequest();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String localPath) {
if (platform == SharePlatform.QZONE) {
shareToQzoneForImage(localPath, activity, listener);
} else {
shareToQQForImage(localPath, activity, listener);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
activity.finish();
listener.shareFailure(new Exception(throwable));
}
});
}
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
ReplayProcessor<R> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
RFuture<R> future;
try {
future = supplier.call();
} catch (Exception e) {
p.onError(e);
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
future.cancel(true);
}
});
future.onComplete((res, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (res != null) {
p.onNext(res);
}
p.onComplete();
});
}
});
}
private Publisher<V> iterator(int startIndex, boolean forward) {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private int currentIndex = startIndex;
@Override
public void accept(long n) throws Exception {
instance.getAsync(currentIndex).onComplete((value, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (value != null) {
p.onNext(value);
if (forward) {
currentIndex++;
} else {
currentIndex--;
}
}
if (value == null) {
p.onComplete();
return;
}
if (n-1 == 0) {
return;
}
try {
accept(n-1);
} catch (Exception e1) {
e1.printStackTrace();
}
});
}
});
}
public <M> Flowable<M> getMessages(Class<M> type) {
ReplayProcessor<M> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
AtomicLong counter = new AtomicLong(n);
RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
@Override
public void onMessage(CharSequence channel, M msg) {
p.onNext(msg);
if (counter.decrementAndGet() == 0) {
topic.removeListenerAsync(this);
p.onComplete();
}
}
});
t.onComplete((id, e) -> {
if (e != null) {
p.onError(e);
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
topic.removeListenerAsync(id);
}
});
});
}
});
}
public Publisher<V> iterator() {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private int currentIndex = 0;
@Override
public void accept(long n) throws Exception {
queue.getValueAsync(currentIndex).onComplete((value, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (value != null) {
p.onNext(value);
currentIndex++;
}
if (value == null) {
p.onComplete();
return;
}
if (n-1 == 0) {
return;
}
try {
accept(n-1);
} catch (Exception e1) {
e1.printStackTrace();
}
});
}
});
}
@RepeatedTest(2)
public void regularModeWithRacingAndOnErrorOverOnNextTest()
throws InterruptedException {
final AtomicLong requested = new AtomicLong();
final AtomicLong produced = new AtomicLong();
final CountDownLatch cancellationLatch = new CountDownLatch(1);
List<Integer> integers = Flowable.range(0, 10000000)
.toList()
.blockingGet();
TestSubscriberProducer<Integer> producer = Flowable.fromIterable(integers)
.doOnCancel(new Action() {
@Override
public void run() {
cancellationLatch.countDown();
}
})
.hide()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io(), true)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer __) {
produced.getAndIncrement();
}
})
.doOnRequest(new LongConsumer() {
@Override
public void accept(long r) {
requested.addAndGet(r);
}
})
.hide()
.subscribeWith(new TestSubscriberProducer<Integer>());
TestCallStreamObserver<Integer> downstream = new TestCallStreamObserver<Integer>(
executorService);
producer.subscribe(downstream);
racePauseResuming(downstream, 100);
downstream.throwOnNext();
Assertions.assertThat(downstream.awaitTerminal(1, TimeUnit.MINUTES)).isTrue();
Assertions.assertThat(cancellationLatch.await(1, TimeUnit.MINUTES)).isTrue();
Assertions.assertThat(downstream.e)
.isExactlyInstanceOf(StatusException.class)
.hasCauseInstanceOf(OnNextTestException.class);
Assertions.assertThat(producer).hasFieldOrPropertyWithValue("sourceMode", 0);
Assertions.assertThat(requested.get()).isEqualTo(produced.get());
Assertions.assertThat(downstream.collected)
.isSubsetOf(integers);
Assertions.assertThat(unhandledThrowable).isEmpty();
}
@RepeatedTest(2)
public void regularModeWithRacingAndCancellationTest() throws InterruptedException {
final AtomicLong requested = new AtomicLong();
final AtomicLong produced = new AtomicLong();
final CountDownLatch cancellationLatch = new CountDownLatch(1);
List<Integer> integers = Flowable.range(0, 10000000)
.toList()
.blockingGet();
TestSubscriberProducer<Integer> producer = Flowable.fromIterable(integers)
.hide()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io(), true)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer __) {
produced.incrementAndGet();
}
})
.doOnCancel(new Action() {
@Override
public void run() {
cancellationLatch.countDown();
}
})
.doOnRequest(new LongConsumer() {
@Override
public void accept(long r) {
requested.addAndGet(r);
}
})
.hide()
.subscribeWith(new TestSubscriberProducer<Integer>());
TestCallStreamObserver<Integer> downstream = new TestCallStreamObserver<Integer>(
executorService);
producer.subscribe(downstream);
racePauseResuming(downstream, 100);
producer.cancel();
Assertions.assertThat(cancellationLatch.await(1, TimeUnit.MINUTES)).isTrue();
Assertions.assertThat(downstream.done.getCount()).isEqualTo(1);
Assertions.assertThat(downstream.e).isNull();
Assertions.assertThat(requested.get()).isBetween(produced.get(), produced.get() + 1);
Assertions.assertThat(producer).hasFieldOrPropertyWithValue("sourceMode", 0);
Assertions.assertThat(downstream.collected)
.isSubsetOf(integers);
Assertions.assertThat(unhandledThrowable).isEmpty();
}
public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch,
final long start, final int maxConcurrency) {
return Flowable.defer(new Callable<Flowable<T>>() {
@Override
public Flowable<T> call() throws Exception {
// need a ReplaySubject because multiple requests can come
// through before concatEager has established subscriptions to
// the subject
final ReplaySubject<Flowable<T>> subject = ReplaySubject.create();
final AtomicLong position = new AtomicLong(start);
LongConsumer request = new LongConsumer() {
@Override
public void accept(final long n) throws Exception {
final long pos = position.getAndAdd(n);
if (SubscriptionHelper.validate(n)) {
Flowable<T> flowable;
try {
flowable = fetch.apply(pos, n);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
subject.onError(e);
return;
}
// reduce allocations by incorporating the onNext
// and onComplete actions into the mutable count
// object
final Count count = new Count(subject, n);
flowable = flowable //
.doOnNext(count) //
.doOnComplete(count);
subject.onNext(flowable);
}
}
};
return Flowable //
.concatEager(subject.serialize() //
.toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
.doOnRequest(request);
}
});
}
@Override
public void shareMedia(
final int platform, final String title, final String targetUrl, final String summary,
final ShareImageObject shareImageObject, final Activity activity, final ShareListener listener) {
Flowable.create(new FlowableOnSubscribe<byte[]>() {
@Override
public void subscribe(@NonNull FlowableEmitter<byte[]> emitter) throws Exception {
try {
String imagePath = ImageDecoder.decode(activity, shareImageObject);
emitter.onNext(ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE));
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnRequest(new LongConsumer() {
@Override
public void accept(long aLong) {
listener.shareRequest();
}
})
.subscribe(new Consumer<byte[]>() {
@Override
public void accept(byte[] bytes) {
WXWebpageObject webpageObject = new WXWebpageObject();
webpageObject.webpageUrl = targetUrl;
WXMediaMessage message = new WXMediaMessage(webpageObject);
message.title = title;
message.description = summary;
message.thumbData = bytes;
sendMessage(platform, message, buildTransaction("webPage"));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
activity.finish();
listener.shareFailure(new Exception(throwable));
}
});
}
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
final Activity activity, final ShareListener listener) {
Flowable.create(new FlowableOnSubscribe<Pair<Bitmap, byte[]>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Pair<Bitmap, byte[]>> emitter) throws Exception {
try {
String imagePath = ImageDecoder.decode(activity, shareImageObject);
emitter.onNext(Pair.create(BitmapFactory.decodeFile(imagePath),
ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE)));
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnRequest(new LongConsumer() {
@Override
public void accept(long aLong) {
listener.shareRequest();
}
})
.subscribe(new Consumer<Pair<Bitmap,byte[]>>() {
@Override
public void accept(Pair<Bitmap, byte[]> pair) {
WXImageObject imageObject = new WXImageObject(pair.first);
WXMediaMessage message = new WXMediaMessage();
message.mediaObject = imageObject;
message.thumbData = pair.second;
sendMessage(platform, message, buildTransaction("image"));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
activity.finish();
listener.shareFailure(new Exception(throwable));
}
});
}
private void shareTextOrImage(final ShareImageObject shareImageObject, final String text,
final Activity activity, final ShareListener listener) {
Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) throws Exception {
try {
String path = ImageDecoder.decode(activity, shareImageObject);
emitter.onNext(Pair.create(path,
ImageDecoder.compress2Byte(path, TARGET_SIZE, TARGET_LENGTH)));
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnRequest(new LongConsumer() {
@Override
public void accept(long aLong) {
listener.shareRequest();
}
})
.subscribe(new Consumer<Pair<String,byte[]>>() {
@Override
public void accept(Pair<String, byte[]> pair) {
ImageObject imageObject = new ImageObject();
imageObject.imageData = pair.second;
imageObject.imagePath = pair.first;
WeiboMultiMessage message = new WeiboMultiMessage();
message.imageObject = imageObject;
if (!TextUtils.isEmpty(text)) {
TextObject textObject = new TextObject();
textObject.text = text;
message.textObject = textObject;
}
sendRequest(activity, message);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
activity.finish();
listener.shareFailure(new Exception(throwable));
}
});
}
@Override
public void shareMedia(final int platform, final String title, final String targetUrl,
final String summary, final ShareImageObject shareImageObject, final Activity activity,
final ShareListener listener) {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception {
try {
emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnRequest(new LongConsumer() {
@Override
public void accept(long aLong) {
listener.shareRequest();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
if (platform == SharePlatform.QZONE) {
shareToQZoneForMedia(title, targetUrl, summary, s, activity,
listener);
} else {
shareToQQForMedia(title, summary, targetUrl, s, activity, listener);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
activity.finish();
listener.shareFailure(new Exception(throwable));
}
});
}
private Publisher<String> createKeysIterator(MasterSlaveEntry entry, String pattern, int count) {
ReplayProcessor<String> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private RedisClient client;
private List<String> firstValues;
private long nextIterPos;
private long currentIndex;
@Override
public void accept(long value) {
currentIndex = value;
nextValues();
}
protected void nextValues() {
instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count).onComplete((res, e) -> {
if (e != null) {
p.onError(e);
return;
}
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = (List<String>) (Object) res.getValues();
} else if (res.getValues().equals(firstValues)) {
p.onComplete();
currentIndex = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (Object val : res.getValues()) {
p.onNext((String) val);
currentIndex--;
if (currentIndex == 0) {
p.onComplete();
return;
}
}
if (nextIterPos == -1) {
p.onComplete();
currentIndex = 0;
}
if (currentIndex == 0) {
return;
}
nextValues();
});
}
});
}
public Flowable<M> create() {
ReplayProcessor<M> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private long nextIterPos;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
@Override
public void accept(long value) throws Exception {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues();
completed = false;
}
};
protected void nextValues() {
map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count).onComplete((res, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (finished) {
client = null;
nextIterPos = 0;
return;
}
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Entry<Object, Object> entry : res.getMap().entrySet()) {
M val = getValue(entry);
p.onNext(val);
elementsRead.incrementAndGet();
}
if (elementsRead.get() >= readAmount.get()) {
p.onComplete();
elementsRead.set(0);
completed = true;
return;
}
if (res.getPos() == 0 && !tryAgain()) {
finished = true;
p.onComplete();
}
if (finished || completed) {
return;
}
nextValues();
});
}
});
}
public Flowable<V> create() {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private long nextIterPos;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
@Override
public void accept(long value) {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues();
completed = false;
}
}
protected void nextValues() {
scanIterator(client, nextIterPos).onComplete((res, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (finished) {
client = null;
nextIterPos = 0;
return;
}
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Object val : res.getValues()) {
p.onNext((V) val);
elementsRead.incrementAndGet();
}
if (elementsRead.get() >= readAmount.get()) {
p.onComplete();
elementsRead.set(0);
completed = true;
return;
}
if (res.getPos() == 0 && !tryAgain()) {
finished = true;
p.onComplete();
}
if (finished || completed) {
return;
}
nextValues();
});
}
});
}