下面列出了io.reactivex.Flowable#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private RxReporter(Builder<M, R> builder) {
this.sender = builder.sender;
this.metrics = builder.metrics;
this.messageTimeoutNanos = builder.messageTimeoutNanos;
this.bufferedMaxMessages = builder.bufferedMaxMessages;
this.pendingMaxMessages = builder.pendingMaxMessages;
this.overflowStrategy = builder.overflowStrategy;
this.scheduler = builder.scheduler;
Flowable<MessagePromise<R>> flowable = Flowable.create(this, BackpressureStrategy.MISSING);
initBackpressurePolicy(flowable)
.observeOn(Schedulers.single())
.groupBy(new MessagePartitioner())
.subscribe(new MessageGroupSubscriber(messageTimeoutNanos, bufferedMaxMessages, sender, scheduler));
}
@Override
public Flowable<T> search(final String searchStr, final boolean caseInsensitive) {
return Flowable.create(emitter -> {
int size = size();
LOG.debug("Code search started: {} ...", searchStr);
for (int i = 0; i < size; i++) {
if (isMatched(keys.get(i), searchStr, caseInsensitive)) {
emitter.onNext(values.get(i));
}
if (emitter.isCancelled()) {
LOG.debug("Code search canceled: {}", searchStr);
return;
}
}
LOG.debug("Code search complete: {}, memory usage: {}", searchStr, UiUtils.memoryInfo());
emitter.onComplete();
}, BackpressureStrategy.LATEST);
}
/**
* use only request with an empty array to request all manifest permissions
*/
public Flowable<PermissionResult> requestAsFlowable(final List<String> permissions) {
return Flowable.create(new FlowableOnSubscribe<PermissionResult>() {
@Override
public void subscribe(final FlowableEmitter<PermissionResult> emitter) throws Exception {
runtimePermission
.request(permissions)
.onResponse(new ResponseCallback() {
@Override
public void onResponse(PermissionResult result) {
if (result.isAccepted()) {
emitter.onNext(result);
} else {
emitter.onError(new Error(result));
}
}
}).ask();
}
}, BackpressureStrategy.LATEST);
}
/**
* Create a mapper from retrofit service to {@link Resource} with rx's {@link Flowable}
* To indicate current state while execute an rest api (loading, error, success with status and message if error)
* @param remote from retrofit service
* @param onSave will be called after success response come, to save response data into local database
* @param <T> type of response
* @return a {@link Flowable} instance to deal with progress showing and error handling
*/
public static <T> Flowable<Resource<T>> createRemoteSourceMapper(@Nullable Single<T> remote,
@Nullable PlainConsumer<T> onSave) {
return Flowable.create(emitter -> {
new SimpleRemoteSourceMapper<T>(emitter) {
@Override
public Single<T> getRemote() {
return remote;
}
@Override
public void saveCallResult(T data) {
if (onSave != null) {
onSave.accept(data);
}
}
};
}, BackpressureStrategy.BUFFER);
}
public <T> Flowable<CacheResult<T>> load2Flowable(final String key, final Type type, BackpressureStrategy backpressureStrategy) {
return Flowable.create(new FlowableOnSubscribe<CacheResult<T>>() {
@Override
public void subscribe(FlowableEmitter<CacheResult<T>> flowableEmitter) throws Exception {
CacheResult<T> load = cacheCore.load(getMD5MessageDigest(key), type);
if (!flowableEmitter.isCancelled()) {
if (load != null) {
flowableEmitter.onNext(load);
flowableEmitter.onComplete();
} else {
flowableEmitter.onError(new NullPointerException("Not find the key corresponding to the cache"));
}
}
}
}, backpressureStrategy);
}
private Flowable buildSendFlowable(
MethodInvocationContext<Object, Object> context,
String topic,
Argument bodyArgument,
Producer kafkaProducer,
List<Header> kafkaHeaders,
ReturnType<Object> returnType,
Object key,
Object value,
Long timestamp) {
Flowable returnFlowable;
ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, value, timestamp);
Optional<Argument<?>> firstTypeVariable = returnType.getFirstTypeVariable();
returnFlowable = Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
if (exception != null) {
emitter.onError(wrapException(context, exception));
} else {
if (firstTypeVariable.isPresent()) {
Argument<?> argument = firstTypeVariable.get();
Optional<?> converted = conversionService.convert(metadata, argument);
if (converted.isPresent()) {
emitter.onNext(converted.get());
} else if (argument.getType() == bodyArgument.getType()) {
emitter.onNext(value);
}
}
emitter.onComplete();
}
}), BackpressureStrategy.ERROR);
return returnFlowable;
}
@Override
public Flowable<CommittedFile> getCommittedFiles(String user, String repo, String sha) {
return Flowable.create(emitter -> {
gitHbubRepos.getSingleCommit(user, repo, sha).getFiles()
.forEach(emitter::onNext);
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
}
@Override
protected Optional<Flowable> createInstanceEmittingAMultipleValuesAndFailure(String v1, String v2,
RuntimeException e) {
Flowable<String> stream = Flowable.create(emitter -> {
emitter.onNext(v1);
emitter.onNext(v2);
emitter.onError(e);
}, BackpressureStrategy.ERROR);
return Optional.of(stream);
}
/**
* Simple {@link Flowable} implementation to emit a range of BigInteger values.
*
* @param startValue first value to emit in range
* @param endValue final value to emit in range
* @param ascending direction to iterate through range
* @return a {@link Flowable} instance to emit this range of values
*/
public static Flowable<BigInteger> range(
final BigInteger startValue, final BigInteger endValue, final boolean ascending) {
if (startValue.compareTo(BigInteger.ZERO) < 0) {
throw new IllegalArgumentException("Negative start index cannot be used");
} else if (startValue.compareTo(endValue) > 0) {
throw new IllegalArgumentException(
"Negative start index cannot be greater then end index");
}
if (ascending) {
return Flowable.create(
subscriber -> {
for (BigInteger i = startValue;
i.compareTo(endValue) < 1 && !subscriber.isCancelled();
i = i.add(BigInteger.ONE)) {
subscriber.onNext(i);
}
if (!subscriber.isCancelled()) {
subscriber.onComplete();
}
},
BackpressureStrategy.BUFFER);
} else {
return Flowable.create(
subscriber -> {
for (BigInteger i = endValue;
i.compareTo(startValue) > -1 && !subscriber.isCancelled();
i = i.subtract(BigInteger.ONE)) {
subscriber.onNext(i);
}
if (!subscriber.isCancelled()) {
subscriber.onComplete();
}
},
BackpressureStrategy.BUFFER);
}
}
private Flowable<String> flowableInputStreamScanner(InputStream inputStream) {
return Flowable.create(subscriber -> {
try (Scanner scanner = new Scanner(inputStream, "UTF-8")) {
while (scanner.hasNext()) {
subscriber.onNext(scanner.nextLine());
}
}
subscriber.onComplete();
}, BackpressureStrategy.BUFFER);
}
/**
* Create flowable of state changes from specified {@link Cursor}
* <p>
* Note: This method will emit current sate immediately after subscribe
*/
public static <State> Flowable<State> asFlowable(final Cursor<State> cursor) {
return Flowable.create(emitter -> {
final Cancelable cancelable = Cursors.forEach(cursor, emitter::onNext);
emitter.setCancellable(cancelable::cancel);
}, BackpressureStrategy.LATEST);
}
@Override
protected Optional<Flowable> createInstanceEmittingAMultipleValuesAndFailure(String v1, String v2,
RuntimeException e) {
Flowable<String> stream = Flowable.create(emitter -> {
emitter.onNext(v1);
emitter.onNext(v2);
emitter.onError(e);
}, BackpressureStrategy.ERROR);
return Optional.of(stream);
}
@Override
public Flowable<DilidiliInfo> getDilidiliInfo() {
return Flowable.create(new FlowableOnSubscribe<DilidiliInfo>() {
@Override
public void subscribe(@NonNull FlowableEmitter<DilidiliInfo> e) throws Exception {
Element html = Jsoup.connect(HtmlConstant.YHDM_M_URL).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
DilidiliInfo dilidiliInfo = JP.from(html, DilidiliInfo.class);
/*Iterator<ScheduleWeek> scheudleWeekIterator = dilidiliInfo.getScheduleWeek().iterator();
while (scheudleWeekIterator.hasNext()) {
ScheduleWeek scheduleWeek = scheudleWeekIterator.next();
Iterator<ScheduleWeek.ScheduleItem> scheduleItemIterator = scheduleWeek
.getScheduleItems().iterator();
while (scheduleItemIterator.hasNext()) {
ScheduleWeek.ScheduleItem scheduleItem = scheduleItemIterator.next();
if (scheduleItem.getAnimeLink().contains("www.005.tv")) {
scheduleItemIterator.remove();
}
}
}
Iterator<ScheduleBanner> scheudleBannerIterator = dilidiliInfo.getScheduleBanners()
.iterator();
while (scheudleBannerIterator.hasNext()) {
ScheduleBanner scheudleBanner = scheudleBannerIterator.next();
if (TextUtils.isEmpty(scheudleBanner.getImgUrl()) |
TextUtils.isEmpty(scheudleBanner.getAnimeLink()) |
!scheudleBanner.getAnimeLink().contains("anime")) {
scheudleBannerIterator.remove();
}
}*/
e.onNext(dilidiliInfo);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
public static <T> Flowable<T> flowableExec(final RealmConfiguration configuration,
final Consumer<Pair<FlowableEmitter, Realm>> emitter) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> e) throws Exception {
try (Realm realm = Realm.getInstance(configuration)) {
emitter.accept(new Pair<FlowableEmitter, Realm>(e, realm));
}
}
}, BackpressureStrategy.BUFFER);
}
private <T> Flowable<T> create(Supplier<T> supplier) {
Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
T response = supplier.get();
if (!emitter.isCancelled()) {
emitter.onNext(response);
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
} catch (Throwable e) {
if (!emitter.isCancelled()) {
emitter.onError(e);
}
}
}
}, BackpressureStrategy.DROP);
if (sc == null) {
return flowable.subscribeOn(Schedulers.io());
}
return flowable.subscribeOn(sc).observeOn(sc);
}
@Test
public void testSubscriberWithException() {
Flowable<Integer> flowable = Flowable.create(source -> {
source.onNext(1);
source.onError(new RuntimeException());
}, BackpressureStrategy.LATEST);
TestSubscriber<Integer> ts = flowable.test();
ts.assertSubscribed();
ts.assertError(RuntimeException.class);
}
private static Flowable<View> clicks(@NonNull View view){
return Flowable.create(new ViewClickOnSubscribe(view), BackpressureStrategy.ERROR);
}
public static void demo5() {
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(downstream);
}
public static Flowable<Error> handleError(final Throwable e) {
return Flowable.create(emitter -> {
e.printStackTrace();
if (e instanceof ConnectException || e instanceof UnknownHostException) {
emitter.onNext(ErrorHelper.netError());
} else if (e instanceof HttpException) {
emitter.onNext(ErrorHelper.httpError());
//eos的接口查询失败竟然是500
// Request request = Objects.requireNonNull(((HttpException) e).response().raw().networkResponse()).request();
// if(request!=null){
// HttpUrl url = request.url();
// URI uri = url.uri();
// String host = uri.getHost();
// if(host.equals(getEosApiHost())){
// emitter.onNext(new Error(EOS_ERROR,e.getMessage()));
// }else{
// emitter.onNext(ErrorHelper.httpError());
// }
// }
} else if (e instanceof SocketTimeoutException) {
emitter.onNext(ErrorHelper.timeout());
} else if (e instanceof SSLException) {
emitter.onNext(ErrorHelper.sslError());
} else if (e instanceof MalformedJsonException || e instanceof JSONException ||
e instanceof JsonParseException) {
emitter.onNext(ErrorHelper.parseError());
} else if (e instanceof ClassCastException) {
emitter.onNext(ErrorHelper.castError());
} else if (e instanceof ApiException) {
// 接口请求失败
ApiException apiException = (ApiException) e;
if (apiException.getError() == ErrorHelper.TOKEN_EXPIRED
&& RetrofitClient.get().getAuthCallback() != null) {
RetrofitClient.get().getAuthCallback().onTokenExpired(apiException.getMsg());
} else {
emitter.onNext(ErrorHelper.apiError(apiException.getError(), apiException.getMsg()));
}
} else if (e instanceof IOException) {
emitter.onNext(ErrorHelper.parseError());
} else {
// 未知错误
emitter.onNext(ErrorHelper.parseError());
}
}, BackpressureStrategy.BUFFER);
}
/** Obtain {@link Flowable} for PV's write access.
*
* <p>The {@link Flowable} will receive <code>true</code> when the PV permits write access.
* When the PV does not allow write access, or the PV becomes disconnected,
* <code>false</code> is emitted.
*
* @return {@link Flowable} that receives <code>true</code>/<code>false</code> to indicate write access
*/
public Flowable<Boolean> onAccessRightsEvent()
{
return Flowable.create(new AccessRightsEventHandler(this), BackpressureStrategy.LATEST);
}