io.reactivex.Flowable#create ( )源码实例Demo

下面列出了io.reactivex.Flowable#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: buffer-slayer   文件: RxReporter.java
private RxReporter(Builder<M, R> builder) {
  this.sender = builder.sender;
  this.metrics = builder.metrics;

  this.messageTimeoutNanos = builder.messageTimeoutNanos;
  this.bufferedMaxMessages = builder.bufferedMaxMessages;
  this.pendingMaxMessages = builder.pendingMaxMessages;
  this.overflowStrategy = builder.overflowStrategy;
  this.scheduler = builder.scheduler;

  Flowable<MessagePromise<R>> flowable = Flowable.create(this, BackpressureStrategy.MISSING);
  initBackpressurePolicy(flowable)
      .observeOn(Schedulers.single())
      .groupBy(new MessagePartitioner())
      .subscribe(new MessageGroupSubscriber(messageTimeoutNanos, bufferedMaxMessages, sender, scheduler));
}
 
源代码2 项目: jadx   文件: CodeIndex.java
@Override
public Flowable<T> search(final String searchStr, final boolean caseInsensitive) {
	return Flowable.create(emitter -> {
		int size = size();
		LOG.debug("Code search started: {} ...", searchStr);
		for (int i = 0; i < size; i++) {
			if (isMatched(keys.get(i), searchStr, caseInsensitive)) {
				emitter.onNext(values.get(i));
			}
			if (emitter.isCancelled()) {
				LOG.debug("Code search canceled: {}", searchStr);
				return;
			}
		}
		LOG.debug("Code search complete: {}, memory usage: {}", searchStr, UiUtils.memoryInfo());
		emitter.onComplete();
	}, BackpressureStrategy.LATEST);
}
 
源代码3 项目: RuntimePermission   文件: RxPermissions.java
/**
 * use only request with an empty array to request all manifest permissions
 */
public Flowable<PermissionResult> requestAsFlowable(final List<String> permissions) {
    return Flowable.create(new FlowableOnSubscribe<PermissionResult>() {
        @Override
        public void subscribe(final FlowableEmitter<PermissionResult> emitter) throws Exception {
            runtimePermission
                    .request(permissions)
                    .onResponse(new ResponseCallback() {
                        @Override
                        public void onResponse(PermissionResult result) {
                            if (result.isAccepted()) {
                                emitter.onNext(result);
                            } else {
                                emitter.onError(new Error(result));
                            }
                        }
                    }).ask();
        }
    }, BackpressureStrategy.LATEST);
}
 
源代码4 项目: mvvm-template   文件: RestHelper.java
/**
 * Create a mapper from retrofit service to {@link Resource} with rx's {@link Flowable}
 * To indicate current state while execute an rest api (loading, error, success with status and message if error)
 * @param remote from retrofit service
 * @param onSave will be called after success response come, to save response data into local database
 * @param <T> type of response
 * @return a {@link Flowable} instance to deal with progress showing and error handling
 */
public static <T> Flowable<Resource<T>> createRemoteSourceMapper(@Nullable Single<T> remote,
                                                                 @Nullable PlainConsumer<T> onSave) {
    return Flowable.create(emitter -> {
        new SimpleRemoteSourceMapper<T>(emitter) {

            @Override
            public Single<T> getRemote() {
                return remote;
            }

            @Override
            public void saveCallResult(T data) {
                if (onSave != null) {
                    onSave.accept(data);
                }
            }
        };
    }, BackpressureStrategy.BUFFER);
}
 
源代码5 项目: RxCache   文件: RxCache.java
public <T> Flowable<CacheResult<T>> load2Flowable(final String key, final Type type, BackpressureStrategy backpressureStrategy) {
    return Flowable.create(new FlowableOnSubscribe<CacheResult<T>>() {
        @Override
        public void subscribe(FlowableEmitter<CacheResult<T>> flowableEmitter) throws Exception {
            CacheResult<T> load = cacheCore.load(getMD5MessageDigest(key), type);
            if (!flowableEmitter.isCancelled()) {
                if (load != null) {
                    flowableEmitter.onNext(load);
                    flowableEmitter.onComplete();
                } else {
                    flowableEmitter.onError(new NullPointerException("Not find the key corresponding to the cache"));
                }
            }
        }
    }, backpressureStrategy);
}
 
private Flowable buildSendFlowable(
        MethodInvocationContext<Object, Object> context,
        String topic,
        Argument bodyArgument,
        Producer kafkaProducer,
        List<Header> kafkaHeaders,
        ReturnType<Object> returnType,
        Object key,
        Object value,
        Long timestamp) {
    Flowable returnFlowable;
    ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, value, timestamp);
    Optional<Argument<?>> firstTypeVariable = returnType.getFirstTypeVariable();
    returnFlowable = Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
        if (exception != null) {
            emitter.onError(wrapException(context, exception));
        } else {
            if (firstTypeVariable.isPresent()) {
                Argument<?> argument = firstTypeVariable.get();
                Optional<?> converted = conversionService.convert(metadata, argument);

                if (converted.isPresent()) {
                    emitter.onNext(converted.get());
                } else if (argument.getType() == bodyArgument.getType()) {
                    emitter.onNext(value);
                }
            }
            emitter.onComplete();
        }
    }), BackpressureStrategy.ERROR);
    return returnFlowable;
}
 
@Override
public Flowable<CommittedFile> getCommittedFiles(String user, String repo, String sha) {
    return Flowable.create(emitter -> {
        gitHbubRepos.getSingleCommit(user, repo, sha).getFiles()
                    .forEach(emitter::onNext);
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER);
}
 
@Override
protected Optional<Flowable> createInstanceEmittingAMultipleValuesAndFailure(String v1, String v2,
        RuntimeException e) {
    Flowable<String> stream = Flowable.create(emitter -> {
        emitter.onNext(v1);
        emitter.onNext(v2);
        emitter.onError(e);
    }, BackpressureStrategy.ERROR);
    return Optional.of(stream);
}
 
源代码9 项目: web3sdk   文件: Flowables.java
/**
 * Simple {@link Flowable} implementation to emit a range of BigInteger values.
 *
 * @param startValue first value to emit in range
 * @param endValue final value to emit in range
 * @param ascending direction to iterate through range
 * @return a {@link Flowable} instance to emit this range of values
 */
public static Flowable<BigInteger> range(
        final BigInteger startValue, final BigInteger endValue, final boolean ascending) {
    if (startValue.compareTo(BigInteger.ZERO) < 0) {
        throw new IllegalArgumentException("Negative start index cannot be used");
    } else if (startValue.compareTo(endValue) > 0) {
        throw new IllegalArgumentException(
                "Negative start index cannot be greater then end index");
    }

    if (ascending) {
        return Flowable.create(
                subscriber -> {
                    for (BigInteger i = startValue;
                            i.compareTo(endValue) < 1 && !subscriber.isCancelled();
                            i = i.add(BigInteger.ONE)) {
                        subscriber.onNext(i);
                    }

                    if (!subscriber.isCancelled()) {
                        subscriber.onComplete();
                    }
                },
                BackpressureStrategy.BUFFER);
    } else {
        return Flowable.create(
                subscriber -> {
                    for (BigInteger i = endValue;
                            i.compareTo(startValue) > -1 && !subscriber.isCancelled();
                            i = i.subtract(BigInteger.ONE)) {
                        subscriber.onNext(i);
                    }

                    if (!subscriber.isCancelled()) {
                        subscriber.onComplete();
                    }
                },
                BackpressureStrategy.BUFFER);
    }
}
 
源代码10 项目: carnotzet   文件: DockerLogManager.java
private Flowable<String> flowableInputStreamScanner(InputStream inputStream) {
	return Flowable.create(subscriber -> {
		try (Scanner scanner = new Scanner(inputStream, "UTF-8")) {
			while (scanner.hasNext()) {
				subscriber.onNext(scanner.nextLine());
			}
		}
		subscriber.onComplete();
	}, BackpressureStrategy.BUFFER);
}
 
源代码11 项目: reductor   文件: RxStore.java
/**
 * Create flowable of state changes from specified {@link Cursor}
 * <p>
 * Note: This method will emit current sate immediately after subscribe
 */
public static <State> Flowable<State> asFlowable(final Cursor<State> cursor) {
    return Flowable.create(emitter -> {
        final Cancelable cancelable = Cursors.forEach(cursor, emitter::onNext);
        emitter.setCancellable(cancelable::cancel);
    }, BackpressureStrategy.LATEST);
}
 
@Override
protected Optional<Flowable> createInstanceEmittingAMultipleValuesAndFailure(String v1, String v2,
        RuntimeException e) {
    Flowable<String> stream = Flowable.create(emitter -> {
        emitter.onNext(v1);
        emitter.onNext(v2);
        emitter.onError(e);
    }, BackpressureStrategy.ERROR);
    return Optional.of(stream);
}
 
源代码13 项目: AcgClub   文件: ScheduleMainModel.java
@Override
public Flowable<DilidiliInfo> getDilidiliInfo() {
  return Flowable.create(new FlowableOnSubscribe<DilidiliInfo>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<DilidiliInfo> e) throws Exception {
      Element html = Jsoup.connect(HtmlConstant.YHDM_M_URL).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        DilidiliInfo dilidiliInfo = JP.from(html, DilidiliInfo.class);
        /*Iterator<ScheduleWeek> scheudleWeekIterator = dilidiliInfo.getScheduleWeek().iterator();
        while (scheudleWeekIterator.hasNext()) {
          ScheduleWeek scheduleWeek = scheudleWeekIterator.next();
          Iterator<ScheduleWeek.ScheduleItem> scheduleItemIterator = scheduleWeek
              .getScheduleItems().iterator();
          while (scheduleItemIterator.hasNext()) {
            ScheduleWeek.ScheduleItem scheduleItem = scheduleItemIterator.next();
            if (scheduleItem.getAnimeLink().contains("www.005.tv")) {
              scheduleItemIterator.remove();
            }
          }
        }
        Iterator<ScheduleBanner> scheudleBannerIterator = dilidiliInfo.getScheduleBanners()
            .iterator();
        while (scheudleBannerIterator.hasNext()) {
          ScheduleBanner scheudleBanner = scheudleBannerIterator.next();
          if (TextUtils.isEmpty(scheudleBanner.getImgUrl()) |
              TextUtils.isEmpty(scheudleBanner.getAnimeLink()) |
              !scheudleBanner.getAnimeLink().contains("anime")) {
            scheudleBannerIterator.remove();
          }
        }*/
        e.onNext(dilidiliInfo);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码14 项目: AcgClub   文件: RxRealmUtils.java
public static <T> Flowable<T> flowableExec(final RealmConfiguration configuration,
    final Consumer<Pair<FlowableEmitter, Realm>> emitter) {
  return Flowable.create(new FlowableOnSubscribe<T>() {
    @Override
    public void subscribe(FlowableEmitter<T> e) throws Exception {
      try (Realm realm = Realm.getInstance(configuration)) {
        emitter.accept(new Pair<FlowableEmitter, Realm>(e, realm));
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码15 项目: cxf   文件: FlowableRxInvokerImpl.java
private <T> Flowable<T> create(Supplier<T> supplier) {
    Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                T response = supplier.get();
                if (!emitter.isCancelled()) {
                    emitter.onNext(response);
                }
                
                if (!emitter.isCancelled()) {
                    emitter.onComplete();
                }
            } catch (Throwable e) {
                if (!emitter.isCancelled()) {
                    emitter.onError(e);
                }
            }
        }
    }, BackpressureStrategy.DROP);
    
    if (sc == null) {
        return flowable.subscribeOn(Schedulers.io());
    }
    
    return flowable.subscribeOn(sc).observeOn(sc);
}
 
源代码16 项目: reactive-streams-in-java   文件: RxJavaDemoTest.java
@Test
public void testSubscriberWithException() {
    Flowable<Integer> flowable = Flowable.create(source -> {
        source.onNext(1);
        source.onError(new RuntimeException());
    }, BackpressureStrategy.LATEST);

    TestSubscriber<Integer> ts = flowable.test();

    ts.assertSubscribed();
    ts.assertError(RuntimeException.class);
}
 
源代码17 项目: dapp-wallet-demo   文件: RxBind.java
private static Flowable<View> clicks(@NonNull View view){
    return Flowable.create(new ViewClickOnSubscribe(view), BackpressureStrategy.ERROR);
}
 
源代码18 项目: RxJava2Demo   文件: ChapterSeven.java
public static void demo5() {
    Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 129; i++) {
                Log.d(TAG, "emit " + i);
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.ERROR);

    Subscriber<Integer> downstream = new Subscriber<Integer>() {

        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "onSubscribe");
            mSubscription = s;
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "onNext: " + integer);

        }

        @Override
        public void onError(Throwable t) {
            Log.w(TAG, "onError: ", t);
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    };

    upstream.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(downstream);
}
 
源代码19 项目: RetrofitGO   文件: RxHelper.java
public static Flowable<Error> handleError(final Throwable e) {
        return Flowable.create(emitter -> {
            e.printStackTrace();
            if (e instanceof ConnectException || e instanceof UnknownHostException) {
                emitter.onNext(ErrorHelper.netError());
            } else if (e instanceof HttpException) {
                emitter.onNext(ErrorHelper.httpError());
                //eos的接口查询失败竟然是500
//                Request request = Objects.requireNonNull(((HttpException) e).response().raw().networkResponse()).request();
//                if(request!=null){
//                    HttpUrl url = request.url();
//                    URI uri = url.uri();
//                    String host = uri.getHost();
//                    if(host.equals(getEosApiHost())){
//                        emitter.onNext(new Error(EOS_ERROR,e.getMessage()));
//                    }else{
//                        emitter.onNext(ErrorHelper.httpError());
//                    }
//                }
            } else if (e instanceof SocketTimeoutException) {
                emitter.onNext(ErrorHelper.timeout());
            } else if (e instanceof SSLException) {
                emitter.onNext(ErrorHelper.sslError());
            } else if (e instanceof MalformedJsonException || e instanceof JSONException ||
                    e instanceof JsonParseException) {
                emitter.onNext(ErrorHelper.parseError());
            } else if (e instanceof ClassCastException) {
                emitter.onNext(ErrorHelper.castError());
            } else if (e instanceof ApiException) {
                // 接口请求失败
                ApiException apiException = (ApiException) e;
                if (apiException.getError() == ErrorHelper.TOKEN_EXPIRED
                        && RetrofitClient.get().getAuthCallback() != null) {
                    RetrofitClient.get().getAuthCallback().onTokenExpired(apiException.getMsg());
                } else {
                    emitter.onNext(ErrorHelper.apiError(apiException.getError(), apiException.getMsg()));
                }
            } else if (e instanceof IOException) {
                emitter.onNext(ErrorHelper.parseError());
            } else {
                // 未知错误
                emitter.onNext(ErrorHelper.parseError());
            }
        }, BackpressureStrategy.BUFFER);
    }
 
源代码20 项目: phoebus   文件: PV.java
/** Obtain {@link Flowable} for PV's write access.
 *
 *  <p>The {@link Flowable} will receive <code>true</code> when the PV permits write access.
 *  When the PV does not allow write access, or the PV becomes disconnected,
 *  <code>false</code> is emitted.
 *
 *  @return {@link Flowable} that receives <code>true</code>/<code>false</code> to indicate write access
 */
public Flowable<Boolean> onAccessRightsEvent()
{
    return Flowable.create(new AccessRightsEventHandler(this), BackpressureStrategy.LATEST);
}