类io.reactivex.functions.LongConsumer源码实例Demo

下面列出了怎么用io.reactivex.functions.LongConsumer的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: redisson   文件: ElementsStream.java
public static <V> Flowable<V> takeElements(Supplier<RFuture<V>> callable) {
    ReplayProcessor<V> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        @Override
        public void accept(long n) throws Exception {
            AtomicLong counter = new AtomicLong(n);
            AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
            
            take(callable, p, counter, futureRef);

            p.doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    futureRef.get().cancel(true);
                }
            });
        }
    });
}
 
源代码2 项目: rxjava2-extras   文件: Consumers.java
public static LongConsumer addLongTo(final List<Long> list) {
    return new LongConsumer() {

        @Override
        public void accept(long t) throws Exception {
            list.add(t);
        }

    };
}
 
源代码3 项目: rxjava2-extras   文件: Consumers.java
public static LongConsumer printLong(final String prefix) {
    return new LongConsumer() {
        @Override
        public void accept(long t) throws Exception {
            System.out.println(prefix + t);
        }
    };
}
 
源代码4 项目: smart-farmer-android   文件: QQShareInstance.java
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
        final Activity activity, final ShareListener listener) {
    Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception {
            try {
                emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnRequest(new LongConsumer() {
                @Override
                public void accept(long aLong) {
                    listener.shareRequest();
                }
            })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String localPath) {
                    if (platform == SharePlatform.QZONE) {
                        shareToQzoneForImage(localPath, activity, listener);
                    } else {
                        shareToQQForImage(localPath, activity, listener);
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) {
                    activity.finish();
                    listener.shareFailure(new Exception(throwable));
                }
            });
}
 
源代码5 项目: redisson   文件: CommandRxService.java
@Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
    ReplayProcessor<R> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        @Override
        public void accept(long t) throws Exception {
            RFuture<R> future;
            try {
                future = supplier.call();
            } catch (Exception e) {
                p.onError(e);
                return;
            }
            p.doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    future.cancel(true);
                }
            });
            
            future.onComplete((res, e) -> {
               if (e != null) {
                   p.onError(e);
                   return;
               }
               
               if (res != null) {
                   p.onNext(res);
               }
               p.onComplete();
            });
        }
    });
}
 
源代码6 项目: redisson   文件: RedissonListRx.java
private Publisher<V> iterator(int startIndex, boolean forward) {
    ReplayProcessor<V> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {

        private int currentIndex = startIndex;
        
        @Override
        public void accept(long n) throws Exception {
            instance.getAsync(currentIndex).onComplete((value, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }
                
                if (value != null) {
                    p.onNext(value);
                    if (forward) {
                        currentIndex++;
                    } else {
                        currentIndex--;
                    }
                }
                
                if (value == null) {
                    p.onComplete();
                    return;
                }
                if (n-1 == 0) {
                    return;
                }
                try {
                    accept(n-1);
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            });
        }
    });
}
 
源代码7 项目: redisson   文件: RedissonTopicRx.java
public <M> Flowable<M> getMessages(Class<M> type) {
    ReplayProcessor<M> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        @Override
        public void accept(long n) throws Exception {
            AtomicLong counter = new AtomicLong(n);
            RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
                @Override
                public void onMessage(CharSequence channel, M msg) {
                    p.onNext(msg);
                    if (counter.decrementAndGet() == 0) {
                        topic.removeListenerAsync(this);
                        p.onComplete();
                    }
                }
            });
            t.onComplete((id, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }
                
                p.doOnCancel(new Action() {
                    @Override
                    public void run() throws Exception {
                        topic.removeListenerAsync(id);
                    }
                });
            });
        }
    });
}
 
源代码8 项目: redisson   文件: RedissonTransferQueueRx.java
public Publisher<V> iterator() {
    ReplayProcessor<V> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {

        private int currentIndex = 0;

        @Override
        public void accept(long n) throws Exception {
            queue.getValueAsync(currentIndex).onComplete((value, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }

                if (value != null) {
                    p.onNext(value);
                    currentIndex++;
                }

                if (value == null) {
                    p.onComplete();
                    return;
                }
                if (n-1 == 0) {
                    return;
                }
                try {
                    accept(n-1);
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            });
        }

    });
}
 
@RepeatedTest(2)
public void regularModeWithRacingAndOnErrorOverOnNextTest()
    throws InterruptedException {
    final AtomicLong requested = new AtomicLong();
    final AtomicLong produced = new AtomicLong();
    final CountDownLatch cancellationLatch = new CountDownLatch(1);
    List<Integer> integers = Flowable.range(0, 10000000)
                                     .toList()
                                     .blockingGet();

    TestSubscriberProducer<Integer> producer = Flowable.fromIterable(integers)
                                                       .doOnCancel(new Action() {
                                                           @Override
                                                           public void run() {
                                                               cancellationLatch.countDown();
                                                           }
                                                       })
                                                       .hide()
                                                       .subscribeOn(Schedulers.io())
                                                       .observeOn(Schedulers.io(), true)
                                                       .doOnNext(new Consumer<Integer>() {
                                                           @Override
                                                           public void accept(Integer __) {
                                                               produced.getAndIncrement();
                                                           }
                                                       })
                                                       .doOnRequest(new LongConsumer() {
                                                           @Override
                                                           public void accept(long r) {
                                                               requested.addAndGet(r);
                                                           }
                                                       })
                                                       .hide()
                                                       .subscribeWith(new TestSubscriberProducer<Integer>());

    TestCallStreamObserver<Integer> downstream = new TestCallStreamObserver<Integer>(
        executorService);
    producer.subscribe(downstream);

    racePauseResuming(downstream, 100);

    downstream.throwOnNext();

    Assertions.assertThat(downstream.awaitTerminal(1, TimeUnit.MINUTES)).isTrue();
    Assertions.assertThat(cancellationLatch.await(1, TimeUnit.MINUTES)).isTrue();
    Assertions.assertThat(downstream.e)
              .isExactlyInstanceOf(StatusException.class)
              .hasCauseInstanceOf(OnNextTestException.class);
    Assertions.assertThat(producer).hasFieldOrPropertyWithValue("sourceMode", 0);
    Assertions.assertThat(requested.get()).isEqualTo(produced.get());
    Assertions.assertThat(downstream.collected)
              .isSubsetOf(integers);
    Assertions.assertThat(unhandledThrowable).isEmpty();
}
 
@RepeatedTest(2)
public void regularModeWithRacingAndCancellationTest() throws InterruptedException {
    final AtomicLong requested = new AtomicLong();
    final AtomicLong produced = new AtomicLong();
    final CountDownLatch cancellationLatch = new CountDownLatch(1);
    List<Integer> integers = Flowable.range(0, 10000000)
                                     .toList()
                                     .blockingGet();

    TestSubscriberProducer<Integer> producer = Flowable.fromIterable(integers)
                                                       .hide()
                                                       .subscribeOn(Schedulers.io())
                                                       .observeOn(Schedulers.io(), true)
                                                       .doOnNext(new Consumer<Integer>() {
                                                           @Override
                                                           public void accept(Integer __) {
                                                               produced.incrementAndGet();
                                                           }
                                                       })
                                                       .doOnCancel(new Action() {
                                                           @Override
                                                           public void run() {
                                                               cancellationLatch.countDown();
                                                           }
                                                       })
                                                       .doOnRequest(new LongConsumer() {
                                                           @Override
                                                           public void accept(long r) {
                                                               requested.addAndGet(r);
                                                           }
                                                       })
                                                       .hide()
                                                       .subscribeWith(new TestSubscriberProducer<Integer>());

    TestCallStreamObserver<Integer> downstream = new TestCallStreamObserver<Integer>(
        executorService);
    producer.subscribe(downstream);

    racePauseResuming(downstream, 100);

    producer.cancel();

    Assertions.assertThat(cancellationLatch.await(1, TimeUnit.MINUTES)).isTrue();
    Assertions.assertThat(downstream.done.getCount()).isEqualTo(1);
    Assertions.assertThat(downstream.e).isNull();
    Assertions.assertThat(requested.get()).isBetween(produced.get(), produced.get() + 1);
    Assertions.assertThat(producer).hasFieldOrPropertyWithValue("sourceMode", 0);
    Assertions.assertThat(downstream.collected)
              .isSubsetOf(integers);
    Assertions.assertThat(unhandledThrowable).isEmpty();
}
 
public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch,
        final long start, final int maxConcurrency) {
    return Flowable.defer(new Callable<Flowable<T>>() {
        @Override
        public Flowable<T> call() throws Exception {
            // need a ReplaySubject because multiple requests can come
            // through before concatEager has established subscriptions to
            // the subject
            final ReplaySubject<Flowable<T>> subject = ReplaySubject.create();
            final AtomicLong position = new AtomicLong(start);
            LongConsumer request = new LongConsumer() {
                @Override
                public void accept(final long n) throws Exception {
                    final long pos = position.getAndAdd(n);
                    if (SubscriptionHelper.validate(n)) {
                        Flowable<T> flowable;
                        try {
                            flowable = fetch.apply(pos, n);
                        } catch (Throwable e) {
                            Exceptions.throwIfFatal(e);
                            subject.onError(e);
                            return;
                        }
                        // reduce allocations by incorporating the onNext
                        // and onComplete actions into the mutable count
                        // object
                        final Count count = new Count(subject, n);
                        flowable = flowable //
                                .doOnNext(count) //
                                .doOnComplete(count);
                        subject.onNext(flowable);
                    }
                }
            };
            return Flowable //
                    .concatEager(subject.serialize() //
                            .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
                    .doOnRequest(request);
        }
    });
}
 
源代码12 项目: smart-farmer-android   文件: WxShareInstance.java
@Override
public void shareMedia(
        final int platform, final String title, final String targetUrl, final String summary,
        final ShareImageObject shareImageObject, final Activity activity, final ShareListener listener) {
    Flowable.create(new FlowableOnSubscribe<byte[]>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<byte[]> emitter) throws Exception {
            try {
                String imagePath = ImageDecoder.decode(activity, shareImageObject);
                emitter.onNext(ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE));
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnRequest(new LongConsumer() {
                @Override
                public void accept(long aLong) {
                    listener.shareRequest();
                }
            })
            .subscribe(new Consumer<byte[]>() {
                @Override
                public void accept(byte[] bytes) {
                    WXWebpageObject webpageObject = new WXWebpageObject();
                    webpageObject.webpageUrl = targetUrl;

                    WXMediaMessage message = new WXMediaMessage(webpageObject);
                    message.title = title;
                    message.description = summary;
                    message.thumbData = bytes;

                    sendMessage(platform, message, buildTransaction("webPage"));
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) {
                    activity.finish();
                    listener.shareFailure(new Exception(throwable));
                }
            });
}
 
源代码13 项目: smart-farmer-android   文件: WxShareInstance.java
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
        final Activity activity, final ShareListener listener) {
    Flowable.create(new FlowableOnSubscribe<Pair<Bitmap, byte[]>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Pair<Bitmap, byte[]>> emitter) throws Exception {
            try {
                String imagePath = ImageDecoder.decode(activity, shareImageObject);
                emitter.onNext(Pair.create(BitmapFactory.decodeFile(imagePath),
                        ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE)));
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnRequest(new LongConsumer() {
                @Override
                public void accept(long aLong) {
                    listener.shareRequest();
                }
            })
            .subscribe(new Consumer<Pair<Bitmap,byte[]>>() {
                @Override
                public void accept(Pair<Bitmap, byte[]> pair) {
                    WXImageObject imageObject = new WXImageObject(pair.first);

                    WXMediaMessage message = new WXMediaMessage();
                    message.mediaObject = imageObject;
                    message.thumbData = pair.second;

                    sendMessage(platform, message, buildTransaction("image"));
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) {
                    activity.finish();
                    listener.shareFailure(new Exception(throwable));
                }
            });
}
 
源代码14 项目: smart-farmer-android   文件: WeiboShareInstance.java
private void shareTextOrImage(final ShareImageObject shareImageObject, final String text,
        final Activity activity, final ShareListener listener) {

    Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) throws Exception {
            try {
                String path = ImageDecoder.decode(activity, shareImageObject);
                emitter.onNext(Pair.create(path,
                        ImageDecoder.compress2Byte(path, TARGET_SIZE, TARGET_LENGTH)));
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnRequest(new LongConsumer() {
                @Override
                public void accept(long aLong) {
                    listener.shareRequest();
                }
            })
            .subscribe(new Consumer<Pair<String,byte[]>>() {
                @Override
                public void accept(Pair<String, byte[]> pair) {
                    ImageObject imageObject = new ImageObject();
                    imageObject.imageData = pair.second;
                    imageObject.imagePath = pair.first;

                    WeiboMultiMessage message = new WeiboMultiMessage();
                    message.imageObject = imageObject;
                    if (!TextUtils.isEmpty(text)) {
                        TextObject textObject = new TextObject();
                        textObject.text = text;

                        message.textObject = textObject;
                    }

                    sendRequest(activity, message);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) {
                    activity.finish();
                    listener.shareFailure(new Exception(throwable));
                }
            });
}
 
源代码15 项目: smart-farmer-android   文件: QQShareInstance.java
@Override
public void shareMedia(final int platform, final String title, final String targetUrl,
                       final String summary, final ShareImageObject shareImageObject, final Activity activity,
                       final ShareListener listener) {
    Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception {
            try {
                emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnRequest(new LongConsumer() {
                @Override
                public void accept(long aLong) {
                    listener.shareRequest();
                }
            })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    if (platform == SharePlatform.QZONE) {
                        shareToQZoneForMedia(title, targetUrl, summary, s, activity,
                                listener);
                    } else {
                        shareToQQForMedia(title, summary, targetUrl, s, activity, listener);
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) {
                    activity.finish();
                    listener.shareFailure(new Exception(throwable));
                }
            });
}
 
源代码16 项目: redisson   文件: RedissonKeysRx.java
private Publisher<String> createKeysIterator(MasterSlaveEntry entry, String pattern, int count) {
    ReplayProcessor<String> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {

        private RedisClient client;
        private List<String> firstValues;
        private long nextIterPos;
        
        private long currentIndex;
        
        @Override
        public void accept(long value) {
            currentIndex = value;
            nextValues();
        }
        
        protected void nextValues() {
            instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count).onComplete((res, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }
                
                client = res.getRedisClient();
                long prevIterPos = nextIterPos;
                if (nextIterPos == 0 && firstValues == null) {
                    firstValues = (List<String>) (Object) res.getValues();
                } else if (res.getValues().equals(firstValues)) {
                    p.onComplete();
                    currentIndex = 0;
                    return;
                }

                nextIterPos = res.getPos();
                if (prevIterPos == nextIterPos) {
                    nextIterPos = -1;
                }
                for (Object val : res.getValues()) {
                    p.onNext((String) val);
                    currentIndex--;
                    if (currentIndex == 0) {
                        p.onComplete();
                        return;
                    }
                }
                if (nextIterPos == -1) {
                    p.onComplete();
                    currentIndex = 0;
                }
                
                if (currentIndex == 0) {
                    return;
                }
                nextValues();
            });
        }
    });
}
 
源代码17 项目: redisson   文件: RedissonMapRxIterator.java
public Flowable<M> create() {
    ReplayProcessor<M> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {

        private long nextIterPos;
        private RedisClient client;
        private AtomicLong elementsRead = new AtomicLong();
        
        private boolean finished;
        private volatile boolean completed;
        private AtomicLong readAmount = new AtomicLong();
        
        @Override
        public void accept(long value) throws Exception {
            readAmount.addAndGet(value);
            if (completed || elementsRead.get() == 0) {
                nextValues();
                completed = false;
            }
        };
        
        protected void nextValues() {
            map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count).onComplete((res, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }

                if (finished) {
                    client = null;
                    nextIterPos = 0;
                    return;
                }

                client = res.getRedisClient();
                nextIterPos = res.getPos();
                
                for (Entry<Object, Object> entry : res.getMap().entrySet()) {
                    M val = getValue(entry);
                    p.onNext(val);
                    elementsRead.incrementAndGet();
                }
                
                if (elementsRead.get() >= readAmount.get()) {
                    p.onComplete();
                    elementsRead.set(0);
                    completed = true;
                    return;
                }
                if (res.getPos() == 0 && !tryAgain()) {
                    finished = true;
                    p.onComplete();
                }
                
                if (finished || completed) {
                    return;
                }
                nextValues();
            });
        }
        
    });
}
 
源代码18 项目: redisson   文件: SetRxIterator.java
public Flowable<V> create() {
    ReplayProcessor<V> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        
        private long nextIterPos;
        private RedisClient client;
        private AtomicLong elementsRead = new AtomicLong();
        
        private boolean finished;
        private volatile boolean completed;
        private AtomicLong readAmount = new AtomicLong();
        
        @Override
        public void accept(long value) {
            readAmount.addAndGet(value);
            if (completed || elementsRead.get() == 0) {
                nextValues();
                completed = false;
            }
        }
        
        protected void nextValues() {
            scanIterator(client, nextIterPos).onComplete((res, e) -> {
                if (e != null) {
                    p.onError(e);
                    return;
                }
                
                if (finished) {
                    client = null;
                    nextIterPos = 0;
                    return;
                }

                client = res.getRedisClient();
                nextIterPos = res.getPos();

                for (Object val : res.getValues()) {
                    p.onNext((V) val);
                    elementsRead.incrementAndGet();
                }
                
                if (elementsRead.get() >= readAmount.get()) {
                    p.onComplete();
                    elementsRead.set(0);
                    completed = true;
                    return;
                }
                if (res.getPos() == 0 && !tryAgain()) {
                    finished = true;
                    p.onComplete();
                }
                
                if (finished || completed) {
                    return;
                }
                nextValues();
            });
        }
    });
}
 
 类所在包
 同包方法